diff --git a/lib/elixir/lib/registry.ex b/lib/elixir/lib/registry.ex index d35f82bc12..451645b79b 100644 --- a/lib/elixir/lib/registry.ex +++ b/lib/elixir/lib/registry.ex @@ -187,7 +187,7 @@ defmodule Registry do Note that the registry uses one ETS table plus two ETS tables per partition. """ - @keys [:unique, :duplicate] + @keys [:unique, :duplicate, {:duplicate, :key}, {:duplicate, :pid}] @all_info -1 @key_info -2 @@ -195,7 +195,7 @@ defmodule Registry do @type registry :: atom @typedoc "The type of the registry" - @type keys :: :unique | :duplicate + @type keys :: :unique | :duplicate | {:duplicate, :key} | {:duplicate, :pid} @typedoc "The type of keys allowed on registration" @type key :: term @@ -266,8 +266,8 @@ defmodule Registry do :undefined end - {kind, _, _} -> - raise ArgumentError, ":via is not supported for #{kind} registries" + {{:duplicate, _}, _, _} -> + raise ArgumentError, ":via is not supported for duplicate registries" end end @@ -329,11 +329,24 @@ defmodule Registry do {Registry, keys: :unique, name: MyApp.Registry, partitions: System.schedulers_online()} ], strategy: :one_for_one) + For `:duplicate` registries with many different keys (e.g., many topics with + few subscribers each), you can optimize key-based lookups by partitioning by key: + + Registry.start_link( + keys: {:duplicate, :key}, + name: MyApp.TopicRegistry, + partitions: System.schedulers_online() + ) + + This allows key-based lookups to check only a single partition instead of + searching all partitions. Use the default `:pid` partitioning when you have + fewer keys with many entries each (e.g., one topic with many subscribers). + ## Options The registry requires the following keys: - * `:keys` - chooses if keys are `:unique` or `:duplicate` + * `:keys` - chooses if keys are `:unique`, `:duplicate`, `{:duplicate, :key}`, or `{:duplicate, :pid}` * `:name` - the name of the registry and its tables The following keys are optional: @@ -345,16 +358,40 @@ defmodule Registry do crashes. Messages sent to listeners are of type `t:listener_message/0`. * `:meta` - a keyword list of metadata to be attached to the registry. + For `:duplicate` registries, you can specify the partitioning strategy + directly in the `:keys` option: + + * `:duplicate` or `{:duplicate, :pid}` - Use `:pid` partitioning (default) + when you have keys with many entries (e.g., one topic with many subscribers). + This is the traditional behavior and groups all entries from the same process together. + + * `{:duplicate, :key}` - Use `:key` partitioning when entries are spread across + many different keys (e.g., many topics with few subscribers each). This makes + key-based lookups more efficient as they only need to check a single partition + instead of all partitions. + """ @doc since: "1.5.0" @spec start_link([start_option]) :: {:ok, pid} | {:error, term} def start_link(options) do keys = Keyword.get(options, :keys) - if keys not in @keys do - raise ArgumentError, - "expected :keys to be given and be one of :unique or :duplicate, got: #{inspect(keys)}" - end + # Validate and normalize keys format + kind = + case keys do + {:duplicate, partition_strategy} when partition_strategy in [:key, :pid] -> + {:duplicate, partition_strategy} + + :unique -> + :unique + + :duplicate -> + {:duplicate, :pid} + + _ -> + raise ArgumentError, + "expected :keys to be given and be one of :unique, :duplicate, {:duplicate, :key}, or {:duplicate, :pid}, got: #{inspect(keys)}" + end name = case Keyword.fetch(options, :name) do @@ -397,11 +434,18 @@ defmodule Registry do # The @info format must be kept in sync with Registry.Partition optimization. entries = [ - {@all_info, {keys, partitions, nil, nil, listeners}}, - {@key_info, {keys, partitions, nil}} | meta + {@all_info, {kind, partitions, nil, nil, listeners}}, + {@key_info, {kind, partitions, nil}} | meta ] - Registry.Supervisor.start_link(keys, name, partitions, listeners, entries, compressed) + Registry.Supervisor.start_link( + kind, + name, + partitions, + listeners, + entries, + compressed + ) end @doc false @@ -467,7 +511,7 @@ defmodule Registry do :error end - {kind, _, _} -> + {kind, _, _, _} -> raise ArgumentError, "Registry.update_value/3 is not supported for #{kind} registries" end end @@ -508,12 +552,12 @@ defmodule Registry do |> List.wrap() |> apply_non_empty_to_mfa_or_fun(mfa_or_fun) - {:duplicate, 1, key_ets} -> + {{:duplicate, _}, 1, key_ets} -> key_ets |> safe_lookup_second(key) |> apply_non_empty_to_mfa_or_fun(mfa_or_fun) - {:duplicate, partitions, _} -> + {{:duplicate, _}, partitions, _} -> if Keyword.get(opts, :parallel, false) do registry |> dispatch_parallel(key, mfa_or_fun, partitions) @@ -625,10 +669,14 @@ defmodule Registry do [] end - {:duplicate, 1, key_ets} -> + {{:duplicate, _}, 1, key_ets} -> safe_lookup_second(key_ets, key) - {:duplicate, partitions, _key_ets} -> + {{:duplicate, :key}, partitions, _key_ets} -> + partition = hash(key, partitions) + safe_lookup_second(key_ets!(registry, partition), key) + + {{:duplicate, :pid}, partitions, _key_ets} -> for partition <- 0..(partitions - 1), pair <- safe_lookup_second(key_ets!(registry, partition), key), do: pair @@ -749,10 +797,10 @@ defmodule Registry do key_ets = key_ets || key_ets!(registry, key, partitions) :ets.select(key_ets, spec) - {:duplicate, 1, key_ets} -> + {{:duplicate, _}, 1, key_ets} -> :ets.select(key_ets, spec) - {:duplicate, partitions, _key_ets} -> + {{:duplicate, _}, partitions, _key_ets} -> for partition <- 0..(partitions - 1), pair <- :ets.select(key_ets!(registry, partition), spec), do: pair @@ -882,8 +930,17 @@ defmodule Registry do [] end - {:duplicate, partitions, key_ets} -> - key_ets = key_ets || key_ets!(registry, pid, partitions) + {{:duplicate, _}, 1, key_ets} -> + for {^pid, value} <- safe_lookup_second(key_ets, key), do: value + + {{:duplicate, :key}, partitions, _key_ets} -> + partition = hash(key, partitions) + key_ets = key_ets!(registry, partition) + for {^pid, value} <- safe_lookup_second(key_ets, key), do: value + + {{:duplicate, :pid}, partitions, _key_ets} -> + partition = hash(pid, partitions) + key_ets = key_ets!(registry, partition) for {^pid, value} <- safe_lookup_second(key_ets, key), do: value end end @@ -1121,7 +1178,7 @@ defmodule Registry do end end - defp register_key(:duplicate, key_ets, _key, entry) do + defp register_key({:duplicate, _}, key_ets, _key, entry) do true = :ets.insert(key_ets, entry) :ok end @@ -1339,10 +1396,10 @@ defmodule Registry do key_ets = key_ets || key_ets!(registry, key, partitions) :ets.select_count(key_ets, spec) - {:duplicate, 1, key_ets} -> + {{:duplicate, _}, 1, key_ets} -> :ets.select_count(key_ets, spec) - {:duplicate, partitions, _key_ets} -> + {{:duplicate, _}, partitions, _key_ets} -> Enum.sum_by(0..(partitions - 1), fn partition_index -> :ets.select_count(key_ets!(registry, partition_index), spec) end) @@ -1512,7 +1569,12 @@ defmodule Registry do {hash(key, partitions), hash(pid, partitions)} end - defp partitions(:duplicate, _key, pid, partitions) do + defp partitions({:duplicate, :key}, key, _pid, partitions) do + partition = hash(key, partitions) + {partition, partition} + end + + defp partitions({:duplicate, :pid}, _key, pid, partitions) do partition = hash(pid, partitions) {partition, partition} end @@ -1559,7 +1621,9 @@ defmodule Registry.Supervisor do for i <- 0..(partitions - 1) do key_partition = Registry.Partition.key_name(registry, i) pid_partition = Registry.Partition.pid_name(registry, i) - arg = {kind, registry, i, partitions, key_partition, pid_partition, listeners, compressed} + + arg = + {kind, registry, i, partitions, key_partition, pid_partition, listeners, compressed} %{ id: pid_partition, @@ -1576,9 +1640,10 @@ defmodule Registry.Supervisor do defp strategy_for_kind(:unique), do: :one_for_all # Duplicate registries have both key and pid partitions hashed - # by pid. This means that, if a PID partition crashes, all of + # by key ({:duplicate, :key}) or pid ({:duplicate, :pid}). + # This means that, if a PID or key partition crashes, all of # its associated entries are in its sibling table, so we crash one. - defp strategy_for_kind(:duplicate), do: :one_for_one + defp strategy_for_kind({:duplicate, _}), do: :one_for_one end defmodule Registry.Partition do @@ -1633,6 +1698,7 @@ defmodule Registry.Partition do def init({kind, registry, i, partitions, key_partition, pid_partition, listeners, compressed}) do Process.flag(:trap_exit, true) + key_ets = init_key_ets(kind, key_partition, compressed) pid_ets = init_pid_ets(kind, pid_partition) @@ -1659,7 +1725,7 @@ defmodule Registry.Partition do :ets.new(key_partition, compression_opt(opts, compressed)) end - defp init_key_ets(:duplicate, key_partition, compressed) do + defp init_key_ets({:duplicate, _}, key_partition, compressed) do opts = [:duplicate_bag, :public, read_concurrency: true, write_concurrency: true] :ets.new(key_partition, compression_opt(opts, compressed)) end diff --git a/lib/elixir/test/elixir/registry_test.exs b/lib/elixir/test/elixir/registry_test.exs index d19264c3f9..a035d5adbb 100644 --- a/lib/elixir/test/elixir/registry_test.exs +++ b/lib/elixir/test/elixir/registry_test.exs @@ -931,12 +931,48 @@ defmodule Registry.Test do {{"world", :_, :_}, [], [true]} ]) end + + test "rejects invalid tuple syntax", %{partitions: partitions} do + name = :"test_invalid_tuple_#{partitions}" + + assert_raise ArgumentError, ~r/expected :keys to be given and be one of/, fn -> + Registry.start_link(keys: {:duplicate, :invalid}, name: name, partitions: partitions) + end + end + end + + for {keys, partitions} <- [ + {{:duplicate, :key}, 1}, + {{:duplicate, :key}, 8}, + {{:duplicate, :pid}, 1}, + {{:duplicate, :pid}, 8} + ] do + @tag keys: keys, partitions: partitions + test "works with tuple syntax #{inspect(keys)} with #{partitions} partitions", + %{keys: keys, partitions: partitions} do + name = :"test_tuple_#{elem(keys, 1)}_#{partitions}" + opts = [keys: keys, name: name, partitions: partitions] + {:ok, _} = start_supervised({Registry, opts}) + + {:ok, _} = Registry.register(name, "hello", :value1) + {:ok, _} = Registry.register(name, "hello", :value2) + {:ok, _} = Registry.register(name, "world", :value3) + + assert 3 == Registry.count(name) + assert Registry.values(name, "hello", self()) |> Enum.sort() == [:value1, :value2] + assert Registry.values(name, "world", self()) == [:value3] + end end # Note: those tests relies on internals - for keys <- [:unique, :duplicate] do + for keys <- [ + :unique, + :duplicate, + {:duplicate, :pid}, + {:duplicate, :key} + ] do @tag keys: keys - test "clean up #{keys} registry on process crash", + test "clean up #{inspect(keys)} registry on process crash", %{registry: registry, partitions: partitions} do {_, task1} = register_task(registry, "hello", :value) {_, task2} = register_task(registry, "world", :value)