From 349721ac49c21fb2df09ba29c486a47719bd23e5 Mon Sep 17 00:00:00 2001 From: Rafal Studnicki Date: Wed, 16 Jul 2025 14:27:35 +0200 Subject: [PATCH 1/7] Add registry partition_by: :pid | :key option --- lib/elixir/lib/registry.ex | 172 ++++++++++++++++------- lib/elixir/test/elixir/registry_test.exs | 30 +++- 2 files changed, 153 insertions(+), 49 deletions(-) diff --git a/lib/elixir/lib/registry.ex b/lib/elixir/lib/registry.ex index d35f82bc12a..522cab40082 100644 --- a/lib/elixir/lib/registry.ex +++ b/lib/elixir/lib/registry.ex @@ -231,6 +231,7 @@ defmodule Registry do | {:partitions, pos_integer} | {:listeners, [atom]} | {:meta, [{meta_key, meta_value}]} + | {:partition_by, :key | :pid} @typedoc """ The message that the registry sends to listeners when a process registers or unregisters. @@ -255,7 +256,7 @@ defmodule Registry do defp whereis_name(registry, key) do case key_info!(registry) do - {:unique, partitions, key_ets} -> + {:unique, partitions, key_ets, _} -> key_ets = key_ets || key_ets!(registry, key, partitions) case safe_lookup_second(key_ets, key) do @@ -266,7 +267,7 @@ defmodule Registry do :undefined end - {kind, _, _} -> + {kind, _, _, _} -> raise ArgumentError, ":via is not supported for #{kind} registries" end end @@ -329,6 +330,20 @@ defmodule Registry do {Registry, keys: :unique, name: MyApp.Registry, partitions: System.schedulers_online()} ], strategy: :one_for_one) + For `:duplicate` registries with many different keys (e.g., many topics with + few subscribers each), you can optimize key-based lookups by partitioning by key: + + Registry.start_link( + keys: :duplicate, + name: MyApp.TopicRegistry, + partitions: System.schedulers_online(), + partition_by: :key + ) + + This allows key-based lookups to check only a single partition instead of + searching all partitions. Use the default `:pid` partitioning when you have + fewer keys with many entries each (e.g., one topic with many subscribers). + ## Options The registry requires the following keys: @@ -344,6 +359,19 @@ defmodule Registry do listener if the listener wants to be notified if the registered process crashes. Messages sent to listeners are of type `t:listener_message/0`. * `:meta` - a keyword list of metadata to be attached to the registry. + * `:partition_by` - the partitioning strategy for `:duplicate` registries. + Can be `:key` or `:pid`. Defaults to `:pid`. + + Use `:pid` (default) when you have keys with many entries (e.g., one topic + with many subscribers). This is the traditional behavior and groups all + entries from the same process together. + + Use `:key` when entries are spread across many different keys (e.g., many + topics with few subscribers each). This makes key-based lookups more + efficient as they only need to check a single partition instead of all + partitions. + + Only supported for `:duplicate` registries. """ @doc since: "1.5.0" @@ -388,6 +416,17 @@ defmodule Registry do "expected :listeners to be a list of named processes, got: #{inspect(listeners)}" end + partition_by = Keyword.get(options, :partition_by, :pid) + + if partition_by not in [:key, :pid] do + raise ArgumentError, + "expected :partition_by to be :key or :pid, got: #{inspect(partition_by)}" + end + + if keys == :unique and partition_by == :key do + raise ArgumentError, ":partition_by :key is only supported for :duplicate registries" + end + compressed = Keyword.get(options, :compressed, false) if not is_boolean(compressed) do @@ -397,11 +436,19 @@ defmodule Registry do # The @info format must be kept in sync with Registry.Partition optimization. entries = [ - {@all_info, {keys, partitions, nil, nil, listeners}}, - {@key_info, {keys, partitions, nil}} | meta + {@all_info, {keys, partitions, nil, nil, listeners, partition_by}}, + {@key_info, {keys, partitions, nil, partition_by}} | meta ] - Registry.Supervisor.start_link(keys, name, partitions, listeners, entries, compressed) + Registry.Supervisor.start_link( + keys, + name, + partitions, + listeners, + entries, + compressed, + partition_by + ) end @doc false @@ -450,7 +497,7 @@ defmodule Registry do {new_value :: term, old_value :: term} | :error def update_value(registry, key, callback) when is_atom(registry) and is_function(callback, 1) do case key_info!(registry) do - {:unique, partitions, key_ets} -> + {:unique, partitions, key_ets, _} -> key_ets = key_ets || key_ets!(registry, key, partitions) try do @@ -467,7 +514,7 @@ defmodule Registry do :error end - {kind, _, _} -> + {kind, _, _, _} -> raise ArgumentError, "Registry.update_value/3 is not supported for #{kind} registries" end end @@ -502,18 +549,18 @@ defmodule Registry do when is_atom(registry) and is_function(mfa_or_fun, 1) when is_atom(registry) and tuple_size(mfa_or_fun) == 3 do case key_info!(registry) do - {:unique, partitions, key_ets} -> + {:unique, partitions, key_ets, _} -> (key_ets || key_ets!(registry, key, partitions)) |> safe_lookup_second(key) |> List.wrap() |> apply_non_empty_to_mfa_or_fun(mfa_or_fun) - {:duplicate, 1, key_ets} -> + {:duplicate, 1, key_ets, _} -> key_ets |> safe_lookup_second(key) |> apply_non_empty_to_mfa_or_fun(mfa_or_fun) - {:duplicate, partitions, _} -> + {:duplicate, partitions, _, _} -> if Keyword.get(opts, :parallel, false) do registry |> dispatch_parallel(key, mfa_or_fun, partitions) @@ -614,7 +661,7 @@ defmodule Registry do @spec lookup(registry, key) :: [{pid, value}] def lookup(registry, key) when is_atom(registry) do case key_info!(registry) do - {:unique, partitions, key_ets} -> + {:unique, partitions, key_ets, _} -> key_ets = key_ets || key_ets!(registry, key, partitions) case safe_lookup_second(key_ets, key) do @@ -625,10 +672,14 @@ defmodule Registry do [] end - {:duplicate, 1, key_ets} -> + {:duplicate, 1, key_ets, _} -> safe_lookup_second(key_ets, key) - {:duplicate, partitions, _key_ets} -> + {:duplicate, partitions, _key_ets, :key} -> + partition = hash(key, partitions) + safe_lookup_second(key_ets!(registry, partition), key) + + {:duplicate, partitions, _key_ets, :pid} -> for partition <- 0..(partitions - 1), pair <- safe_lookup_second(key_ets!(registry, partition), key), do: pair @@ -689,7 +740,7 @@ defmodule Registry do @doc since: "1.18.0" def lock(registry, lock_key, function) when is_atom(registry) and is_function(function, 0) do - {_kind, partitions, _, pid_ets, _} = info!(registry) + {_kind, partitions, _, pid_ets, _, _} = info!(registry) {pid_server, _pid_ets} = pid_ets || pid_ets!(registry, lock_key, partitions) Registry.Partition.lock(pid_server, lock_key, function) end @@ -745,14 +796,14 @@ defmodule Registry do spec = [{{:_, {:_, pattern}}, guards, [{:element, 2, :"$_"}]}] case key_info!(registry) do - {:unique, partitions, key_ets} -> + {:unique, partitions, key_ets, _} -> key_ets = key_ets || key_ets!(registry, key, partitions) :ets.select(key_ets, spec) - {:duplicate, 1, key_ets} -> + {:duplicate, 1, key_ets, _} -> :ets.select(key_ets, spec) - {:duplicate, partitions, _key_ets} -> + {:duplicate, partitions, _key_ets, _} -> for partition <- 0..(partitions - 1), pair <- :ets.select(key_ets!(registry, partition), spec), do: pair @@ -794,7 +845,7 @@ defmodule Registry do @doc since: "1.4.0" @spec keys(registry, pid) :: [key] def keys(registry, pid) when is_atom(registry) and is_pid(pid) do - {kind, partitions, _, pid_ets, _} = info!(registry) + {kind, partitions, _, pid_ets, _, _} = info!(registry) {_, pid_ets} = pid_ets || pid_ets!(registry, pid, partitions) keys = @@ -871,7 +922,7 @@ defmodule Registry do @spec values(registry, key, pid) :: [value] def values(registry, key, pid) when is_atom(registry) do case key_info!(registry) do - {:unique, partitions, key_ets} -> + {:unique, partitions, key_ets, _} -> key_ets = key_ets || key_ets!(registry, key, partitions) case safe_lookup_second(key_ets, key) do @@ -882,8 +933,16 @@ defmodule Registry do [] end - {:duplicate, partitions, key_ets} -> - key_ets = key_ets || key_ets!(registry, pid, partitions) + {:duplicate, 1, key_ets, _} -> + for {^pid, value} <- safe_lookup_second(key_ets, key), do: value + + {:duplicate, partitions, _key_ets, :key} -> + partition = hash(key, partitions) + key_ets = key_ets!(registry, partition) + for {^pid, value} <- safe_lookup_second(key_ets, key), do: value + + {:duplicate, partitions, _key_ets, :pid} -> + key_ets = key_ets!(registry, pid, partitions) for {^pid, value} <- safe_lookup_second(key_ets, key), do: value end end @@ -930,8 +989,8 @@ defmodule Registry do @spec unregister(registry, key) :: :ok def unregister(registry, key) when is_atom(registry) do self = self() - {kind, partitions, key_ets, pid_ets, listeners} = info!(registry) - {key_partition, pid_partition} = partitions(kind, key, self, partitions) + {kind, partitions, key_ets, pid_ets, listeners, partition_by} = info!(registry) + {key_partition, pid_partition} = partitions(kind, key, self, partitions, partition_by) key_ets = key_ets || key_ets!(registry, key_partition) {pid_server, pid_ets} = pid_ets || pid_ets!(registry, pid_partition) @@ -993,8 +1052,8 @@ defmodule Registry do def unregister_match(registry, key, pattern, guards \\ []) when is_list(guards) do self = self() - {kind, partitions, key_ets, pid_ets, listeners} = info!(registry) - {key_partition, pid_partition} = partitions(kind, key, self, partitions) + {kind, partitions, key_ets, pid_ets, listeners, partition_by} = info!(registry) + {key_partition, pid_partition} = partitions(kind, key, self, partitions, partition_by) key_ets = key_ets || key_ets!(registry, key_partition) {pid_server, pid_ets} = pid_ets || pid_ets!(registry, pid_partition) @@ -1089,8 +1148,8 @@ defmodule Registry do @spec register(registry, key, value) :: {:ok, pid} | {:error, {:already_registered, pid}} def register(registry, key, value) when is_atom(registry) do self = self() - {kind, partitions, key_ets, pid_ets, listeners} = info!(registry) - {key_partition, pid_partition} = partitions(kind, key, self, partitions) + {kind, partitions, key_ets, pid_ets, listeners, partition_by} = info!(registry) + {key_partition, pid_partition} = partitions(kind, key, self, partitions, partition_by) key_ets = key_ets || key_ets!(registry, key_partition) {pid_server, pid_ets} = pid_ets || pid_ets!(registry, pid_partition) @@ -1266,12 +1325,14 @@ defmodule Registry do @spec count(registry) :: non_neg_integer() def count(registry) when is_atom(registry) do case key_info!(registry) do - {_kind, partitions, nil} -> - Enum.sum_by(0..(partitions - 1), fn partition_index -> + {_kind, partitions, nil, _} -> + 0..(partitions - 1) + |> Enum.map(fn partition_index -> safe_size(key_ets!(registry, partition_index)) end) + |> Enum.sum() - {_kind, 1, key_ets} -> + {_kind, 1, key_ets, _} -> safe_size(key_ets) end end @@ -1335,17 +1396,19 @@ defmodule Registry do spec = [{{:_, {:_, pattern}}, guards, [true]}] case key_info!(registry) do - {:unique, partitions, key_ets} -> + {:unique, partitions, key_ets, _} -> key_ets = key_ets || key_ets!(registry, key, partitions) :ets.select_count(key_ets, spec) - {:duplicate, 1, key_ets} -> + {:duplicate, 1, key_ets, _} -> :ets.select_count(key_ets, spec) - {:duplicate, partitions, _key_ets} -> - Enum.sum_by(0..(partitions - 1), fn partition_index -> + {:duplicate, partitions, _key_ets, _} -> + 0..(partitions - 1) + |> Enum.map(fn partition_index -> :ets.select_count(key_ets!(registry, partition_index), spec) end) + |> Enum.sum() end end @@ -1404,12 +1467,12 @@ defmodule Registry do spec = group_match_headers(spec, __ENV__.function) case key_info!(registry) do - {_kind, partitions, nil} -> + {_kind, partitions, nil, _} -> Enum.flat_map(0..(partitions - 1), fn partition_index -> :ets.select(key_ets!(registry, partition_index), spec) end) - {_kind, 1, key_ets} -> + {_kind, 1, key_ets, _} -> :ets.select(key_ets, spec) end end @@ -1435,12 +1498,14 @@ defmodule Registry do spec = group_match_headers(spec, __ENV__.function) case key_info!(registry) do - {_kind, partitions, nil} -> - Enum.sum_by(0..(partitions - 1), fn partition_index -> + {_kind, partitions, nil, _} -> + 0..(partitions - 1) + |> Enum.map(fn partition_index -> :ets.select_count(key_ets!(registry, partition_index), spec) end) + |> Enum.sum() - {_kind, 1, key_ets} -> + {_kind, 1, key_ets, _} -> :ets.select_count(key_ets, spec) end end @@ -1508,11 +1573,16 @@ defmodule Registry do end end - defp partitions(:unique, key, pid, partitions) do + defp partitions(:unique, key, pid, partitions, _partition_by) do {hash(key, partitions), hash(pid, partitions)} end - defp partitions(:duplicate, _key, pid, partitions) do + defp partitions(:duplicate, key, _pid, partitions, :key) do + partition = hash(key, partitions) + {partition, partition} + end + + defp partitions(:duplicate, _key, pid, partitions, :pid) do partition = hash(pid, partitions) {partition, partition} end @@ -1546,12 +1616,12 @@ defmodule Registry.Supervisor do @moduledoc false use Supervisor - def start_link(kind, registry, partitions, listeners, entries, compressed) do - arg = {kind, registry, partitions, listeners, entries, compressed} + def start_link(kind, registry, partitions, listeners, entries, compressed, partition_by) do + arg = {kind, registry, partitions, listeners, entries, compressed, partition_by} Supervisor.start_link(__MODULE__, arg, name: registry) end - def init({kind, registry, partitions, listeners, entries, compressed}) do + def init({kind, registry, partitions, listeners, entries, compressed, partition_by}) do ^registry = :ets.new(registry, [:set, :public, :named_table, read_concurrency: true]) true = :ets.insert(registry, entries) @@ -1559,7 +1629,10 @@ defmodule Registry.Supervisor do for i <- 0..(partitions - 1) do key_partition = Registry.Partition.key_name(registry, i) pid_partition = Registry.Partition.pid_name(registry, i) - arg = {kind, registry, i, partitions, key_partition, pid_partition, listeners, compressed} + + arg = + {kind, registry, i, partitions, key_partition, pid_partition, listeners, compressed, + partition_by} %{ id: pid_partition, @@ -1631,7 +1704,10 @@ defmodule Registry.Partition do ## Callbacks - def init({kind, registry, i, partitions, key_partition, pid_partition, listeners, compressed}) do + def init( + {kind, registry, i, partitions, key_partition, pid_partition, listeners, compressed, + partition_by} + ) do Process.flag(:trap_exit, true) key_ets = init_key_ets(kind, key_partition, compressed) pid_ets = init_pid_ets(kind, pid_partition) @@ -1640,8 +1716,8 @@ defmodule Registry.Partition do # is to write the table information alongside the registry info. if partitions == 1 do entries = [ - {@key_info, {kind, partitions, key_ets}}, - {@all_info, {kind, partitions, key_ets, {self(), pid_ets}, listeners}} + {@key_info, {kind, partitions, key_ets, partition_by}}, + {@all_info, {kind, partitions, key_ets, {self(), pid_ets}, listeners, partition_by}} ] true = :ets.insert(registry, entries) diff --git a/lib/elixir/test/elixir/registry_test.exs b/lib/elixir/test/elixir/registry_test.exs index d19264c3f97..bf4f5640883 100644 --- a/lib/elixir/test/elixir/registry_test.exs +++ b/lib/elixir/test/elixir/registry_test.exs @@ -931,6 +931,34 @@ defmodule Registry.Test do {{"world", :_, :_}, [], [true]} ]) end + + test "works with partition_by: :key", %{partitions: partitions} do + name = :"test_partition_by_keys_#{partitions}" + opts = [keys: :duplicate, name: name, partitions: partitions, partition_by: :key] + {:ok, _} = start_supervised({Registry, opts}) + + {:ok, _} = Registry.register(name, "hello", :value1) + {:ok, _} = Registry.register(name, "hello", :value2) + {:ok, _} = Registry.register(name, "world", :value3) + + assert 3 == Registry.count(name) + assert Registry.values(name, "hello", self()) |> Enum.sort() == [:value1, :value2] + assert Registry.values(name, "world", self()) == [:value3] + end + + test "works with partition_by: :pid", %{partitions: partitions} do + name = :"test_partition_by_pids_#{partitions}" + opts = [keys: :duplicate, name: name, partitions: partitions, partition_by: :pid] + {:ok, _} = start_supervised({Registry, opts}) + + {:ok, _} = Registry.register(name, "hello", :value1) + {:ok, _} = Registry.register(name, "hello", :value2) + {:ok, _} = Registry.register(name, "world", :value3) + + assert 3 == Registry.count(name) + assert Registry.values(name, "hello", self()) |> Enum.sort() == [:value1, :value2] + assert Registry.values(name, "world", self()) == [:value3] + end end # Note: those tests relies on internals @@ -958,7 +986,7 @@ defmodule Registry.Test do assert :ets.tab2list(pid) == [] end else - [{-1, {_, _, key, {partition, pid}, _}}] = :ets.lookup(registry, -1) + [{-1, {_, _, key, {partition, pid}, _, _}}] = :ets.lookup(registry, -1) GenServer.call(partition, :sync) assert :ets.tab2list(key) == [] assert :ets.tab2list(pid) == [] From e7d9496d7cbc0e6b99031e9e5bc52af014c29f44 Mon Sep 17 00:00:00 2001 From: Rafal Studnicki Date: Wed, 16 Jul 2025 15:10:04 +0200 Subject: [PATCH 2/7] Extend the clean up test with partition_by values --- lib/elixir/test/elixir/registry_test.exs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/lib/elixir/test/elixir/registry_test.exs b/lib/elixir/test/elixir/registry_test.exs index bf4f5640883..18fd5d0ee99 100644 --- a/lib/elixir/test/elixir/registry_test.exs +++ b/lib/elixir/test/elixir/registry_test.exs @@ -23,6 +23,7 @@ defmodule Registry.Test do listeners = List.wrap(config[:base_listener]) |> Enum.map(&:"#{&1}_#{partitions}") name = :"#{config.test}_#{partitions}" opts = [keys: keys, name: name, partitions: partitions, listeners: listeners] + opts = if config[:partition_by], do: opts ++ [partition_by: config.partition_by], else: opts {:ok, _} = start_supervised({Registry, opts}) %{registry: name, listeners: listeners} end @@ -962,9 +963,14 @@ defmodule Registry.Test do end # Note: those tests relies on internals - for keys <- [:unique, :duplicate] do + for {keys, partition_by} <- [ + {:unique, nil}, + {:duplicate, :pid}, + {:duplicate, :key} + ] do @tag keys: keys - test "clean up #{keys} registry on process crash", + @tag partition_by: partition_by + test "clean up #{keys} registry on process crash with partition_by: #{partition_by || "default"}", %{registry: registry, partitions: partitions} do {_, task1} = register_task(registry, "hello", :value) {_, task2} = register_task(registry, "world", :value) From 3959062b71e4152dc42b4855a85587698a1ae5ed Mon Sep 17 00:00:00 2001 From: Rafal Studnicki Date: Thu, 17 Jul 2025 12:44:08 +0200 Subject: [PATCH 3/7] Get rid of extra partition_by option, store partition strategy as part of {:duplicate, startegy} --- lib/elixir/lib/registry.ex | 180 +++++++++++------------ lib/elixir/test/elixir/registry_test.exs | 31 ++-- 2 files changed, 106 insertions(+), 105 deletions(-) diff --git a/lib/elixir/lib/registry.ex b/lib/elixir/lib/registry.ex index 522cab40082..9cda3d97567 100644 --- a/lib/elixir/lib/registry.ex +++ b/lib/elixir/lib/registry.ex @@ -187,7 +187,7 @@ defmodule Registry do Note that the registry uses one ETS table plus two ETS tables per partition. """ - @keys [:unique, :duplicate] + @keys [:unique, :duplicate, {:duplicate, :key}, {:duplicate, :pid}] @all_info -1 @key_info -2 @@ -195,7 +195,7 @@ defmodule Registry do @type registry :: atom @typedoc "The type of the registry" - @type keys :: :unique | :duplicate + @type keys :: :unique | :duplicate | {:duplicate, :key} | {:duplicate, :pid} @typedoc "The type of keys allowed on registration" @type key :: term @@ -231,7 +231,6 @@ defmodule Registry do | {:partitions, pos_integer} | {:listeners, [atom]} | {:meta, [{meta_key, meta_value}]} - | {:partition_by, :key | :pid} @typedoc """ The message that the registry sends to listeners when a process registers or unregisters. @@ -256,7 +255,7 @@ defmodule Registry do defp whereis_name(registry, key) do case key_info!(registry) do - {:unique, partitions, key_ets, _} -> + {:unique, partitions, key_ets} -> key_ets = key_ets || key_ets!(registry, key, partitions) case safe_lookup_second(key_ets, key) do @@ -267,8 +266,8 @@ defmodule Registry do :undefined end - {kind, _, _, _} -> - raise ArgumentError, ":via is not supported for #{kind} registries" + {{:duplicate, _}, _, _} -> + raise ArgumentError, ":via is not supported for duplicate registries" end end @@ -334,10 +333,9 @@ defmodule Registry do few subscribers each), you can optimize key-based lookups by partitioning by key: Registry.start_link( - keys: :duplicate, + keys: {:duplicate, :key}, name: MyApp.TopicRegistry, - partitions: System.schedulers_online(), - partition_by: :key + partitions: System.schedulers_online() ) This allows key-based lookups to check only a single partition instead of @@ -348,7 +346,7 @@ defmodule Registry do The registry requires the following keys: - * `:keys` - chooses if keys are `:unique` or `:duplicate` + * `:keys` - chooses if keys are `:unique`, `:duplicate`, `{:duplicate, :key}`, or `{:duplicate, :pid}` * `:name` - the name of the registry and its tables The following keys are optional: @@ -359,19 +357,18 @@ defmodule Registry do listener if the listener wants to be notified if the registered process crashes. Messages sent to listeners are of type `t:listener_message/0`. * `:meta` - a keyword list of metadata to be attached to the registry. - * `:partition_by` - the partitioning strategy for `:duplicate` registries. - Can be `:key` or `:pid`. Defaults to `:pid`. - Use `:pid` (default) when you have keys with many entries (e.g., one topic - with many subscribers). This is the traditional behavior and groups all - entries from the same process together. + For `:duplicate` registries, you can specify the partitioning strategy + directly in the `:keys` option: - Use `:key` when entries are spread across many different keys (e.g., many - topics with few subscribers each). This makes key-based lookups more - efficient as they only need to check a single partition instead of all - partitions. + * `:duplicate` or `{:duplicate, :pid}` - Use `:pid` partitioning (default) + when you have keys with many entries (e.g., one topic with many subscribers). + This is the traditional behavior and groups all entries from the same process together. - Only supported for `:duplicate` registries. + * `{:duplicate, :key}` - Use `:key` partitioning when entries are spread across + many different keys (e.g., many topics with few subscribers each). This makes + key-based lookups more efficient as they only need to check a single partition + instead of all partitions. """ @doc since: "1.5.0" @@ -379,10 +376,22 @@ defmodule Registry do def start_link(options) do keys = Keyword.get(options, :keys) - if keys not in @keys do - raise ArgumentError, - "expected :keys to be given and be one of :unique or :duplicate, got: #{inspect(keys)}" - end + # Validate and normalize keys format + kind = + case keys do + {:duplicate, partition_strategy} when partition_strategy in [:key, :pid] -> + {:duplicate, partition_strategy} + + :unique -> + :unique + + :duplicate -> + {:duplicate, :pid} + + _ -> + raise ArgumentError, + "expected :keys to be given and be one of :unique, :duplicate, {:duplicate, :key}, or {:duplicate, :pid}, got: #{inspect(keys)}" + end name = case Keyword.fetch(options, :name) do @@ -416,17 +425,6 @@ defmodule Registry do "expected :listeners to be a list of named processes, got: #{inspect(listeners)}" end - partition_by = Keyword.get(options, :partition_by, :pid) - - if partition_by not in [:key, :pid] do - raise ArgumentError, - "expected :partition_by to be :key or :pid, got: #{inspect(partition_by)}" - end - - if keys == :unique and partition_by == :key do - raise ArgumentError, ":partition_by :key is only supported for :duplicate registries" - end - compressed = Keyword.get(options, :compressed, false) if not is_boolean(compressed) do @@ -436,18 +434,17 @@ defmodule Registry do # The @info format must be kept in sync with Registry.Partition optimization. entries = [ - {@all_info, {keys, partitions, nil, nil, listeners, partition_by}}, - {@key_info, {keys, partitions, nil, partition_by}} | meta + {@all_info, {kind, partitions, nil, nil, listeners}}, + {@key_info, {kind, partitions, nil}} | meta ] Registry.Supervisor.start_link( - keys, + kind, name, partitions, listeners, entries, - compressed, - partition_by + compressed ) end @@ -497,7 +494,7 @@ defmodule Registry do {new_value :: term, old_value :: term} | :error def update_value(registry, key, callback) when is_atom(registry) and is_function(callback, 1) do case key_info!(registry) do - {:unique, partitions, key_ets, _} -> + {:unique, partitions, key_ets} -> key_ets = key_ets || key_ets!(registry, key, partitions) try do @@ -549,18 +546,18 @@ defmodule Registry do when is_atom(registry) and is_function(mfa_or_fun, 1) when is_atom(registry) and tuple_size(mfa_or_fun) == 3 do case key_info!(registry) do - {:unique, partitions, key_ets, _} -> + {:unique, partitions, key_ets} -> (key_ets || key_ets!(registry, key, partitions)) |> safe_lookup_second(key) |> List.wrap() |> apply_non_empty_to_mfa_or_fun(mfa_or_fun) - {:duplicate, 1, key_ets, _} -> + {{:duplicate, _}, 1, key_ets} -> key_ets |> safe_lookup_second(key) |> apply_non_empty_to_mfa_or_fun(mfa_or_fun) - {:duplicate, partitions, _, _} -> + {{:duplicate, _}, partitions, _} -> if Keyword.get(opts, :parallel, false) do registry |> dispatch_parallel(key, mfa_or_fun, partitions) @@ -661,7 +658,7 @@ defmodule Registry do @spec lookup(registry, key) :: [{pid, value}] def lookup(registry, key) when is_atom(registry) do case key_info!(registry) do - {:unique, partitions, key_ets, _} -> + {:unique, partitions, key_ets} -> key_ets = key_ets || key_ets!(registry, key, partitions) case safe_lookup_second(key_ets, key) do @@ -672,14 +669,14 @@ defmodule Registry do [] end - {:duplicate, 1, key_ets, _} -> + {{:duplicate, _}, 1, key_ets} -> safe_lookup_second(key_ets, key) - {:duplicate, partitions, _key_ets, :key} -> + {{:duplicate, :key}, partitions, _key_ets} -> partition = hash(key, partitions) safe_lookup_second(key_ets!(registry, partition), key) - {:duplicate, partitions, _key_ets, :pid} -> + {{:duplicate, :pid}, partitions, _key_ets} -> for partition <- 0..(partitions - 1), pair <- safe_lookup_second(key_ets!(registry, partition), key), do: pair @@ -740,7 +737,7 @@ defmodule Registry do @doc since: "1.18.0" def lock(registry, lock_key, function) when is_atom(registry) and is_function(function, 0) do - {_kind, partitions, _, pid_ets, _, _} = info!(registry) + {_kind, partitions, _, pid_ets, _} = info!(registry) {pid_server, _pid_ets} = pid_ets || pid_ets!(registry, lock_key, partitions) Registry.Partition.lock(pid_server, lock_key, function) end @@ -796,14 +793,14 @@ defmodule Registry do spec = [{{:_, {:_, pattern}}, guards, [{:element, 2, :"$_"}]}] case key_info!(registry) do - {:unique, partitions, key_ets, _} -> + {:unique, partitions, key_ets} -> key_ets = key_ets || key_ets!(registry, key, partitions) :ets.select(key_ets, spec) - {:duplicate, 1, key_ets, _} -> + {{:duplicate, _}, 1, key_ets} -> :ets.select(key_ets, spec) - {:duplicate, partitions, _key_ets, _} -> + {{:duplicate, _}, partitions, _key_ets} -> for partition <- 0..(partitions - 1), pair <- :ets.select(key_ets!(registry, partition), spec), do: pair @@ -845,7 +842,7 @@ defmodule Registry do @doc since: "1.4.0" @spec keys(registry, pid) :: [key] def keys(registry, pid) when is_atom(registry) and is_pid(pid) do - {kind, partitions, _, pid_ets, _, _} = info!(registry) + {kind, partitions, _, pid_ets, _} = info!(registry) {_, pid_ets} = pid_ets || pid_ets!(registry, pid, partitions) keys = @@ -922,7 +919,7 @@ defmodule Registry do @spec values(registry, key, pid) :: [value] def values(registry, key, pid) when is_atom(registry) do case key_info!(registry) do - {:unique, partitions, key_ets, _} -> + {:unique, partitions, key_ets} -> key_ets = key_ets || key_ets!(registry, key, partitions) case safe_lookup_second(key_ets, key) do @@ -933,16 +930,17 @@ defmodule Registry do [] end - {:duplicate, 1, key_ets, _} -> + {{:duplicate, _}, 1, key_ets} -> for {^pid, value} <- safe_lookup_second(key_ets, key), do: value - {:duplicate, partitions, _key_ets, :key} -> + {{:duplicate, :key}, partitions, _key_ets} -> partition = hash(key, partitions) key_ets = key_ets!(registry, partition) for {^pid, value} <- safe_lookup_second(key_ets, key), do: value - {:duplicate, partitions, _key_ets, :pid} -> - key_ets = key_ets!(registry, pid, partitions) + {{:duplicate, :pid}, partitions, _key_ets} -> + partition = hash(pid, partitions) + key_ets = key_ets!(registry, partition) for {^pid, value} <- safe_lookup_second(key_ets, key), do: value end end @@ -989,8 +987,8 @@ defmodule Registry do @spec unregister(registry, key) :: :ok def unregister(registry, key) when is_atom(registry) do self = self() - {kind, partitions, key_ets, pid_ets, listeners, partition_by} = info!(registry) - {key_partition, pid_partition} = partitions(kind, key, self, partitions, partition_by) + {kind, partitions, key_ets, pid_ets, listeners} = info!(registry) + {key_partition, pid_partition} = partitions(kind, key, self, partitions) key_ets = key_ets || key_ets!(registry, key_partition) {pid_server, pid_ets} = pid_ets || pid_ets!(registry, pid_partition) @@ -1052,8 +1050,8 @@ defmodule Registry do def unregister_match(registry, key, pattern, guards \\ []) when is_list(guards) do self = self() - {kind, partitions, key_ets, pid_ets, listeners, partition_by} = info!(registry) - {key_partition, pid_partition} = partitions(kind, key, self, partitions, partition_by) + {kind, partitions, key_ets, pid_ets, listeners} = info!(registry) + {key_partition, pid_partition} = partitions(kind, key, self, partitions) key_ets = key_ets || key_ets!(registry, key_partition) {pid_server, pid_ets} = pid_ets || pid_ets!(registry, pid_partition) @@ -1148,8 +1146,8 @@ defmodule Registry do @spec register(registry, key, value) :: {:ok, pid} | {:error, {:already_registered, pid}} def register(registry, key, value) when is_atom(registry) do self = self() - {kind, partitions, key_ets, pid_ets, listeners, partition_by} = info!(registry) - {key_partition, pid_partition} = partitions(kind, key, self, partitions, partition_by) + {kind, partitions, key_ets, pid_ets, listeners} = info!(registry) + {key_partition, pid_partition} = partitions(kind, key, self, partitions) key_ets = key_ets || key_ets!(registry, key_partition) {pid_server, pid_ets} = pid_ets || pid_ets!(registry, pid_partition) @@ -1180,7 +1178,7 @@ defmodule Registry do end end - defp register_key(:duplicate, key_ets, _key, entry) do + defp register_key({:duplicate, _}, key_ets, _key, entry) do true = :ets.insert(key_ets, entry) :ok end @@ -1325,14 +1323,14 @@ defmodule Registry do @spec count(registry) :: non_neg_integer() def count(registry) when is_atom(registry) do case key_info!(registry) do - {_kind, partitions, nil, _} -> + {_kind, partitions, nil} -> 0..(partitions - 1) |> Enum.map(fn partition_index -> safe_size(key_ets!(registry, partition_index)) end) |> Enum.sum() - {_kind, 1, key_ets, _} -> + {_kind, 1, key_ets} -> safe_size(key_ets) end end @@ -1396,14 +1394,14 @@ defmodule Registry do spec = [{{:_, {:_, pattern}}, guards, [true]}] case key_info!(registry) do - {:unique, partitions, key_ets, _} -> + {:unique, partitions, key_ets} -> key_ets = key_ets || key_ets!(registry, key, partitions) :ets.select_count(key_ets, spec) - {:duplicate, 1, key_ets, _} -> + {{:duplicate, _}, 1, key_ets} -> :ets.select_count(key_ets, spec) - {:duplicate, partitions, _key_ets, _} -> + {{:duplicate, _}, partitions, _key_ets} -> 0..(partitions - 1) |> Enum.map(fn partition_index -> :ets.select_count(key_ets!(registry, partition_index), spec) @@ -1467,12 +1465,12 @@ defmodule Registry do spec = group_match_headers(spec, __ENV__.function) case key_info!(registry) do - {_kind, partitions, nil, _} -> + {_kind, partitions, nil} -> Enum.flat_map(0..(partitions - 1), fn partition_index -> :ets.select(key_ets!(registry, partition_index), spec) end) - {_kind, 1, key_ets, _} -> + {_kind, 1, key_ets} -> :ets.select(key_ets, spec) end end @@ -1498,14 +1496,14 @@ defmodule Registry do spec = group_match_headers(spec, __ENV__.function) case key_info!(registry) do - {_kind, partitions, nil, _} -> + {_kind, partitions, nil} -> 0..(partitions - 1) |> Enum.map(fn partition_index -> :ets.select_count(key_ets!(registry, partition_index), spec) end) |> Enum.sum() - {_kind, 1, key_ets, _} -> + {_kind, 1, key_ets} -> :ets.select_count(key_ets, spec) end end @@ -1573,16 +1571,14 @@ defmodule Registry do end end - defp partitions(:unique, key, pid, partitions, _partition_by) do + defp partitions(:unique, key, pid, partitions) do {hash(key, partitions), hash(pid, partitions)} end - - defp partitions(:duplicate, key, _pid, partitions, :key) do + defp partitions({:duplicate, :key}, key, _pid, partitions) do partition = hash(key, partitions) {partition, partition} end - - defp partitions(:duplicate, _key, pid, partitions, :pid) do + defp partitions({:duplicate, :pid}, _key, pid, partitions) do partition = hash(pid, partitions) {partition, partition} end @@ -1616,12 +1612,12 @@ defmodule Registry.Supervisor do @moduledoc false use Supervisor - def start_link(kind, registry, partitions, listeners, entries, compressed, partition_by) do - arg = {kind, registry, partitions, listeners, entries, compressed, partition_by} + def start_link(kind, registry, partitions, listeners, entries, compressed) do + arg = {kind, registry, partitions, listeners, entries, compressed} Supervisor.start_link(__MODULE__, arg, name: registry) end - def init({kind, registry, partitions, listeners, entries, compressed, partition_by}) do + def init({kind, registry, partitions, listeners, entries, compressed}) do ^registry = :ets.new(registry, [:set, :public, :named_table, read_concurrency: true]) true = :ets.insert(registry, entries) @@ -1631,8 +1627,7 @@ defmodule Registry.Supervisor do pid_partition = Registry.Partition.pid_name(registry, i) arg = - {kind, registry, i, partitions, key_partition, pid_partition, listeners, compressed, - partition_by} + {kind, registry, i, partitions, key_partition, pid_partition, listeners, compressed} %{ id: pid_partition, @@ -1649,9 +1644,10 @@ defmodule Registry.Supervisor do defp strategy_for_kind(:unique), do: :one_for_all # Duplicate registries have both key and pid partitions hashed - # by pid. This means that, if a PID partition crashes, all of + # by key ({:duplicate, :key}) or pid ({:duplicate, :pid}). + # This means that, if a PID or key partition crashes, all of # its associated entries are in its sibling table, so we crash one. - defp strategy_for_kind(:duplicate), do: :one_for_one + defp strategy_for_kind({:duplicate, _}), do: :one_for_one end defmodule Registry.Partition do @@ -1704,11 +1700,9 @@ defmodule Registry.Partition do ## Callbacks - def init( - {kind, registry, i, partitions, key_partition, pid_partition, listeners, compressed, - partition_by} - ) do + def init({kind, registry, i, partitions, key_partition, pid_partition, listeners, compressed}) do Process.flag(:trap_exit, true) + key_ets = init_key_ets(kind, key_partition, compressed) pid_ets = init_pid_ets(kind, pid_partition) @@ -1716,8 +1710,8 @@ defmodule Registry.Partition do # is to write the table information alongside the registry info. if partitions == 1 do entries = [ - {@key_info, {kind, partitions, key_ets, partition_by}}, - {@all_info, {kind, partitions, key_ets, {self(), pid_ets}, listeners, partition_by}} + {@key_info, {kind, partitions, key_ets}}, + {@all_info, {kind, partitions, key_ets, {self(), pid_ets}, listeners}} ] true = :ets.insert(registry, entries) @@ -1735,7 +1729,7 @@ defmodule Registry.Partition do :ets.new(key_partition, compression_opt(opts, compressed)) end - defp init_key_ets(:duplicate, key_partition, compressed) do + defp init_key_ets({:duplicate, _}, key_partition, compressed) do opts = [:duplicate_bag, :public, read_concurrency: true, write_concurrency: true] :ets.new(key_partition, compression_opt(opts, compressed)) end diff --git a/lib/elixir/test/elixir/registry_test.exs b/lib/elixir/test/elixir/registry_test.exs index 18fd5d0ee99..31341ed479e 100644 --- a/lib/elixir/test/elixir/registry_test.exs +++ b/lib/elixir/test/elixir/registry_test.exs @@ -23,7 +23,6 @@ defmodule Registry.Test do listeners = List.wrap(config[:base_listener]) |> Enum.map(&:"#{&1}_#{partitions}") name = :"#{config.test}_#{partitions}" opts = [keys: keys, name: name, partitions: partitions, listeners: listeners] - opts = if config[:partition_by], do: opts ++ [partition_by: config.partition_by], else: opts {:ok, _} = start_supervised({Registry, opts}) %{registry: name, listeners: listeners} end @@ -933,9 +932,9 @@ defmodule Registry.Test do ]) end - test "works with partition_by: :key", %{partitions: partitions} do - name = :"test_partition_by_keys_#{partitions}" - opts = [keys: :duplicate, name: name, partitions: partitions, partition_by: :key] + test "works with tuple syntax {:duplicate, :key}", %{partitions: partitions} do + name = :"test_tuple_keys_#{partitions}" + opts = [keys: {:duplicate, :key}, name: name, partitions: partitions] {:ok, _} = start_supervised({Registry, opts}) {:ok, _} = Registry.register(name, "hello", :value1) @@ -947,9 +946,9 @@ defmodule Registry.Test do assert Registry.values(name, "world", self()) == [:value3] end - test "works with partition_by: :pid", %{partitions: partitions} do - name = :"test_partition_by_pids_#{partitions}" - opts = [keys: :duplicate, name: name, partitions: partitions, partition_by: :pid] + test "works with tuple syntax {:duplicate, :pid}", %{partitions: partitions} do + name = :"test_tuple_pids_#{partitions}" + opts = [keys: {:duplicate, :pid}, name: name, partitions: partitions] {:ok, _} = start_supervised({Registry, opts}) {:ok, _} = Registry.register(name, "hello", :value1) @@ -960,17 +959,25 @@ defmodule Registry.Test do assert Registry.values(name, "hello", self()) |> Enum.sort() == [:value1, :value2] assert Registry.values(name, "world", self()) == [:value3] end + + test "rejects invalid tuple syntax", %{partitions: partitions} do + name = :"test_invalid_tuple_#{partitions}" + + assert_raise ArgumentError, ~r/expected :keys to be given and be one of/, fn -> + Registry.start_link(keys: {:duplicate, :invalid}, name: name, partitions: partitions) + end + end end # Note: those tests relies on internals - for {keys, partition_by} <- [ - {:unique, nil}, + for keys <- [ + :unique, + :duplicate, {:duplicate, :pid}, {:duplicate, :key} ] do @tag keys: keys - @tag partition_by: partition_by - test "clean up #{keys} registry on process crash with partition_by: #{partition_by || "default"}", + test "clean up #{inspect(keys)} registry on process crash", %{registry: registry, partitions: partitions} do {_, task1} = register_task(registry, "hello", :value) {_, task2} = register_task(registry, "world", :value) @@ -992,7 +999,7 @@ defmodule Registry.Test do assert :ets.tab2list(pid) == [] end else - [{-1, {_, _, key, {partition, pid}, _, _}}] = :ets.lookup(registry, -1) + [{-1, {_, _, key, {partition, pid}, _}}] = :ets.lookup(registry, -1) GenServer.call(partition, :sync) assert :ets.tab2list(key) == [] assert :ets.tab2list(pid) == [] From fa31cdcaacad06c633fd143eb52030e58bfe2f03 Mon Sep 17 00:00:00 2001 From: Rafal Studnicki Date: Thu, 17 Jul 2025 12:55:42 +0200 Subject: [PATCH 4/7] Test various duplicate registry settings in for loop --- lib/elixir/lib/registry.ex | 2 ++ lib/elixir/test/elixir/registry_test.exs | 39 +++++++++++------------- 2 files changed, 19 insertions(+), 22 deletions(-) diff --git a/lib/elixir/lib/registry.ex b/lib/elixir/lib/registry.ex index 9cda3d97567..e32cb54e350 100644 --- a/lib/elixir/lib/registry.ex +++ b/lib/elixir/lib/registry.ex @@ -1574,10 +1574,12 @@ defmodule Registry do defp partitions(:unique, key, pid, partitions) do {hash(key, partitions), hash(pid, partitions)} end + defp partitions({:duplicate, :key}, key, _pid, partitions) do partition = hash(key, partitions) {partition, partition} end + defp partitions({:duplicate, :pid}, _key, pid, partitions) do partition = hash(pid, partitions) {partition, partition} diff --git a/lib/elixir/test/elixir/registry_test.exs b/lib/elixir/test/elixir/registry_test.exs index 31341ed479e..a035d5adbb8 100644 --- a/lib/elixir/test/elixir/registry_test.exs +++ b/lib/elixir/test/elixir/registry_test.exs @@ -932,23 +932,26 @@ defmodule Registry.Test do ]) end - test "works with tuple syntax {:duplicate, :key}", %{partitions: partitions} do - name = :"test_tuple_keys_#{partitions}" - opts = [keys: {:duplicate, :key}, name: name, partitions: partitions] - {:ok, _} = start_supervised({Registry, opts}) - - {:ok, _} = Registry.register(name, "hello", :value1) - {:ok, _} = Registry.register(name, "hello", :value2) - {:ok, _} = Registry.register(name, "world", :value3) + test "rejects invalid tuple syntax", %{partitions: partitions} do + name = :"test_invalid_tuple_#{partitions}" - assert 3 == Registry.count(name) - assert Registry.values(name, "hello", self()) |> Enum.sort() == [:value1, :value2] - assert Registry.values(name, "world", self()) == [:value3] + assert_raise ArgumentError, ~r/expected :keys to be given and be one of/, fn -> + Registry.start_link(keys: {:duplicate, :invalid}, name: name, partitions: partitions) + end end + end - test "works with tuple syntax {:duplicate, :pid}", %{partitions: partitions} do - name = :"test_tuple_pids_#{partitions}" - opts = [keys: {:duplicate, :pid}, name: name, partitions: partitions] + for {keys, partitions} <- [ + {{:duplicate, :key}, 1}, + {{:duplicate, :key}, 8}, + {{:duplicate, :pid}, 1}, + {{:duplicate, :pid}, 8} + ] do + @tag keys: keys, partitions: partitions + test "works with tuple syntax #{inspect(keys)} with #{partitions} partitions", + %{keys: keys, partitions: partitions} do + name = :"test_tuple_#{elem(keys, 1)}_#{partitions}" + opts = [keys: keys, name: name, partitions: partitions] {:ok, _} = start_supervised({Registry, opts}) {:ok, _} = Registry.register(name, "hello", :value1) @@ -959,14 +962,6 @@ defmodule Registry.Test do assert Registry.values(name, "hello", self()) |> Enum.sort() == [:value1, :value2] assert Registry.values(name, "world", self()) == [:value3] end - - test "rejects invalid tuple syntax", %{partitions: partitions} do - name = :"test_invalid_tuple_#{partitions}" - - assert_raise ArgumentError, ~r/expected :keys to be given and be one of/, fn -> - Registry.start_link(keys: {:duplicate, :invalid}, name: name, partitions: partitions) - end - end end # Note: those tests relies on internals From 09d5ea3ae6a1c341c9d8bb0b14f89b8e785ba71f Mon Sep 17 00:00:00 2001 From: Rafal Studnicki Date: Thu, 17 Jul 2025 13:00:08 +0200 Subject: [PATCH 5/7] Restore Enum.sum_by used in Registry.count --- lib/elixir/lib/registry.ex | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/elixir/lib/registry.ex b/lib/elixir/lib/registry.ex index e32cb54e350..abb874ee018 100644 --- a/lib/elixir/lib/registry.ex +++ b/lib/elixir/lib/registry.ex @@ -1324,11 +1324,9 @@ defmodule Registry do def count(registry) when is_atom(registry) do case key_info!(registry) do {_kind, partitions, nil} -> - 0..(partitions - 1) - |> Enum.map(fn partition_index -> + Enum.sum_by(0..(partitions - 1), fn partition_index -> safe_size(key_ets!(registry, partition_index)) end) - |> Enum.sum() {_kind, 1, key_ets} -> safe_size(key_ets) From 2da924c71dc6cc01a8e9180ca343f2d7225caf04 Mon Sep 17 00:00:00 2001 From: Rafal Studnicki Date: Thu, 17 Jul 2025 13:01:16 +0200 Subject: [PATCH 6/7] Restore Enum.sum_by used in Registry.count_match --- lib/elixir/lib/registry.ex | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/elixir/lib/registry.ex b/lib/elixir/lib/registry.ex index abb874ee018..41e8dd1452b 100644 --- a/lib/elixir/lib/registry.ex +++ b/lib/elixir/lib/registry.ex @@ -1400,11 +1400,9 @@ defmodule Registry do :ets.select_count(key_ets, spec) {{:duplicate, _}, partitions, _key_ets} -> - 0..(partitions - 1) - |> Enum.map(fn partition_index -> + Enum.sum_by(0..(partitions - 1), fn partition_index -> :ets.select_count(key_ets!(registry, partition_index), spec) end) - |> Enum.sum() end end From b83745cd8ec105fed5658c4ad31613e6692f194a Mon Sep 17 00:00:00 2001 From: Rafal Studnicki Date: Thu, 17 Jul 2025 13:02:28 +0200 Subject: [PATCH 7/7] Restore Enum.sum_by used in Registry.count_select --- lib/elixir/lib/registry.ex | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/elixir/lib/registry.ex b/lib/elixir/lib/registry.ex index 41e8dd1452b..451645b79bf 100644 --- a/lib/elixir/lib/registry.ex +++ b/lib/elixir/lib/registry.ex @@ -1493,11 +1493,9 @@ defmodule Registry do case key_info!(registry) do {_kind, partitions, nil} -> - 0..(partitions - 1) - |> Enum.map(fn partition_index -> + Enum.sum_by(0..(partitions - 1), fn partition_index -> :ets.select_count(key_ets!(registry, partition_index), spec) end) - |> Enum.sum() {_kind, 1, key_ets} -> :ets.select_count(key_ets, spec)