From 83f2f0338fabe7226b30ee3a9b13e35b4361af03 Mon Sep 17 00:00:00 2001 From: Steffen Deusch Date: Wed, 5 Feb 2025 12:09:34 +0100 Subject: [PATCH] add [:phoenix, :socket_drain] telemetry event (#6070) and also use it for logging drain events --- installer/templates/phx_web/telemetry.ex | 1 + lib/phoenix/endpoint.ex | 2 ++ lib/phoenix/logger.ex | 22 ++++++++++++++++++ lib/phoenix/socket.ex | 6 ++++- lib/phoenix/socket/pool_supervisor.ex | 29 ++++++++++++++++-------- test/phoenix/socket/socket_test.exs | 8 ++++--- 6 files changed, 54 insertions(+), 14 deletions(-) diff --git a/installer/templates/phx_web/telemetry.ex b/installer/templates/phx_web/telemetry.ex index 4edb792901..5662c7f7c9 100644 --- a/installer/templates/phx_web/telemetry.ex +++ b/installer/templates/phx_web/telemetry.ex @@ -43,6 +43,7 @@ defmodule <%= @web_namespace %>.Telemetry do summary("phoenix.socket_connected.duration", unit: {:native, :millisecond} ), + sum("phoenix.socket_drain.count"), summary("phoenix.channel_joined.duration", unit: {:native, :millisecond} ), diff --git a/lib/phoenix/endpoint.ex b/lib/phoenix/endpoint.ex index 346a4d84a4..0eb84851fd 100644 --- a/lib/phoenix/endpoint.ex +++ b/lib/phoenix/endpoint.ex @@ -782,6 +782,8 @@ defmodule Phoenix.Endpoint do batch to terminate. Defaults to 2000ms. * `:shutdown` - The maximum amount of time in milliseconds allowed to drain all batches. Defaults to 30000ms. + * `:log` - the log level for drain actions. Defaults the `:log` option + passed to `use Phoenix.Socket` or `:info`. Set it to `false` to disable logging. For example, if you have 150k connections, the default values will split them into 15 batches of 10k connections. Each batch takes diff --git a/lib/phoenix/logger.ex b/lib/phoenix/logger.ex index 97f7fea7a5..299df47acb 100644 --- a/lib/phoenix/logger.ex +++ b/lib/phoenix/logger.ex @@ -58,6 +58,11 @@ defmodule Phoenix.Logger do * Metadata: `%{endpoint: atom, transport: atom, params: term, connect_info: map, vsn: binary, user_socket: atom, result: :ok | :error, serializer: atom, log: Logger.level | false}` * Disable logging: `use Phoenix.Socket, log: false` or `socket "/foo", MySocket, websocket: [log: false]` in your endpoint + * `[:phoenix, :socket_drain]` - dispatched by `Phoenix.Socket` when using the `:drainer` option + * Measurement: `%{count: integer, total: integer, index: integer, rounds: integer}` + * Metadata: `%{endpoint: atom, socket: atom, intervasl: integer, log: Logger.level | false}` + * Disable logging: `use Phoenix.Socket, log: false` in your endpoint or pass `:log` option in the `:drainer` option + * `[:phoenix, :channel_joined]` - dispatched at the end of a channel join * Measurement: `%{duration: native_time}` * Metadata: `%{result: :ok | :error, params: term, socket: Phoenix.Socket.t}` @@ -134,6 +139,7 @@ defmodule Phoenix.Logger do [:phoenix, :router_dispatch, :start] => &__MODULE__.phoenix_router_dispatch_start/4, [:phoenix, :error_rendered] => &__MODULE__.phoenix_error_rendered/4, [:phoenix, :socket_connected] => &__MODULE__.phoenix_socket_connected/4, + [:phoenix, :socket_drain] => &__MODULE__.phoenix_socket_drain/4, [:phoenix, :channel_joined] => &__MODULE__.phoenix_channel_joined/4, [:phoenix, :channel_handled_in] => &__MODULE__.phoenix_channel_handled_in/4 } @@ -334,6 +340,22 @@ defmodule Phoenix.Logger do defp connect_result(:ok), do: "CONNECTED TO " defp connect_result(:error), do: "REFUSED CONNECTION TO " + @doc false + def phoenix_socket_drain(_, _, %{log: false}, _), do: :ok + + def phoenix_socket_drain(_, %{count: count, total: total, index: index, rounds: rounds}, %{log: level} = meta, _) do + Logger.log(level, fn -> + %{socket: socket, interval: interval} = meta + + [ + "DRAINING #{count} of #{total} total connection(s) for socket ", + inspect(socket), + " every #{interval}ms - ", + "round #{index} of #{rounds}" + ] + end) + end + ## Event: [:phoenix, :channel_joined] @doc false diff --git a/lib/phoenix/socket.ex b/lib/phoenix/socket.ex index 77286a8db1..912f41441d 100644 --- a/lib/phoenix/socket.ex +++ b/lib/phoenix/socket.ex @@ -456,10 +456,14 @@ defmodule Phoenix.Socket do case drainer do {module, function, arguments} -> apply(module, function, arguments) + _ -> drainer end - {Phoenix.Socket.PoolDrainer, {endpoint, handler, drainer}} + + opts = Keyword.merge(opts, drainer: drainer) + + {Phoenix.Socket.PoolDrainer, {endpoint, handler, opts}} else :ignore end diff --git a/lib/phoenix/socket/pool_supervisor.ex b/lib/phoenix/socket/pool_supervisor.ex index 936c0affee..833e64c2be 100644 --- a/lib/phoenix/socket/pool_supervisor.ex +++ b/lib/phoenix/socket/pool_supervisor.ex @@ -76,7 +76,7 @@ defmodule Phoenix.Socket.PoolDrainer do %{ id: {:terminator, name}, start: {__MODULE__, :start_link, [tuple]}, - shutdown: Keyword.get(opts, :shutdown, 30_000) + shutdown: Keyword.get(opts[:drainer], :shutdown, 30_000) } end @@ -87,13 +87,14 @@ defmodule Phoenix.Socket.PoolDrainer do @impl true def init({endpoint, name, opts}) do Process.flag(:trap_exit, true) - size = Keyword.get(opts, :batch_size, 10_000) - interval = Keyword.get(opts, :batch_interval, 2_000) - {:ok, {endpoint, name, size, interval}} + size = Keyword.get(opts[:drainer], :batch_size, 10_000) + interval = Keyword.get(opts[:drainer], :batch_interval, 2_000) + log_level = Keyword.get(opts[:drainer], :log, opts[:log] || :info) + {:ok, {endpoint, name, size, interval, log_level}} end @impl true - def terminate(_reason, {endpoint, name, size, interval}) do + def terminate(_reason, {endpoint, name, size, interval, log_level}) do ets = endpoint.config({:socket, name}) partitions = :ets.lookup_element(ets, :partitions, 2) @@ -110,12 +111,20 @@ defmodule Phoenix.Socket.PoolDrainer do rounds = div(total, size) + 1 - if total != 0 do - Logger.info("Shutting down #{total} sockets in #{rounds} rounds of #{interval}ms") - end - for {pids, index} <- - collection |> Stream.concat() |> Stream.chunk_every(size) |> Stream.with_index(1) do + collection |> Stream.concat() |> Stream.chunk_every(size) |> Stream.with_index(1) do + count = if index == rounds, do: length(pids), else: size + + :telemetry.execute( + [:phoenix, :socket_drain], + %{count: count, total: total, index: index, rounds: rounds}, + %{ + endpoint: endpoint, + socket: name, + interval: interval, + log: log_level + } + ) spawn(fn -> for pid <- pids do diff --git a/test/phoenix/socket/socket_test.exs b/test/phoenix/socket/socket_test.exs index d089a8de29..9ac57b4259 100644 --- a/test/phoenix/socket/socket_test.exs +++ b/test/phoenix/socket/socket_test.exs @@ -76,7 +76,7 @@ defmodule Phoenix.SocketTest do test "merges keyword lists" do socket = %Phoenix.Socket{} socket = assign(socket, %{foo: :bar, abc: :def}) - socket = assign(socket, [foo: :baz]) + socket = assign(socket, foo: :baz) assert socket.assigns[:foo] == :baz assert socket.assigns[:abc] == :def end @@ -109,7 +109,8 @@ defmodule Phoenix.SocketTest do ] assert DrainerSpecSocket.drainer_spec(drainer: drainer_spec, endpoint: Endpoint) == - {Phoenix.Socket.PoolDrainer, {Endpoint, DrainerSpecSocket, drainer_spec}} + {Phoenix.Socket.PoolDrainer, + {Endpoint, DrainerSpecSocket, [endpoint: Endpoint, drainer: drainer_spec]}} end test "loads dynamic drainer config" do @@ -119,7 +120,8 @@ defmodule Phoenix.SocketTest do drainer: {DrainerSpecSocket, :dynamic_drainer_config, []}, endpoint: Endpoint ) == - {Phoenix.Socket.PoolDrainer, {Endpoint, DrainerSpecSocket, drainer_spec}} + {Phoenix.Socket.PoolDrainer, + {Endpoint, DrainerSpecSocket, [endpoint: Endpoint, drainer: drainer_spec]}} end test "returns ignore if drainer is set to false" do