diff --git a/lib/statix.ex b/lib/statix.ex index 818101f..2bf21d8 100644 --- a/lib/statix.ex +++ b/lib/statix.ex @@ -40,15 +40,24 @@ defmodule Statix do config :statix, MyApp.Statix, port: 8123 + Unix domain sockets are also supported for local connections: + + config :statix, MyApp.Statix, + socket_path: "/var/run/statsd.sock" + The following is a list of all the supported options: * `:prefix` - (binary) all metrics sent to the StatsD-compatible server through the configured Statix connection will be prefixed with the value of this option. By default this option is not present. * `:host` - (binary) the host where the StatsD-compatible server is running. - Defaults to `"127.0.0.1"`. + Defaults to `"127.0.0.1"`. Ignored if `:socket_path` is set. * `:port` - (integer) the port (on `:host`) where the StatsD-compatible - server is running. Defaults to `8125`. + server is running. Defaults to `8125`. Ignored if `:socket_path` is set. + * `:socket_path` - (binary) path to a Unix domain socket for the StatsD-compatible + server. When this option is present, `:host` and `:port` are ignored and the + connection uses Unix domain sockets instead of UDP. Requires OTP 22+. + By default this option is not present. * `:tags` - ([binary]) a list of global tags that will be sent with all metrics. By default this option is not present. See the "Tags" section for more information. @@ -105,6 +114,8 @@ defmodule Statix do alias __MODULE__.Conn + require Logger + @type key :: iodata @type options :: [sample_rate: float, tags: [String.t()]] @type on_send :: :ok | {:error, term} @@ -359,7 +370,14 @@ defmodule Statix do @doc false def new(module, options) do config = get_config(module, options) - conn = Conn.new(config.host, config.port, config.prefix) + + # Determine transport based on socket_path presence + conn = + if config.socket_path do + Conn.new(config.socket_path, config.prefix) + else + Conn.new(config.host, config.port, config.prefix) + end %__MODULE__{ conn: conn, @@ -375,6 +393,17 @@ defmodule Statix do end @doc false + def open(%__MODULE__{conn: %{transport: :uds, sock: {:socket_path, path}} = conn, pool: pool}) do + # UDS sockets are socket references (not ports), so they cannot be registered as process names. + # Instead, store them in ConnTracker's ETS table. + connections = + Enum.map(pool, fn _name -> + Conn.open(conn) + end) + + Statix.ConnTracker.set(path, connections) + end + def open(%__MODULE__{conn: conn, pool: pool}) do Enum.each(pool, fn name -> %{sock: sock} = Conn.open(conn) @@ -384,16 +413,37 @@ defmodule Statix do @doc false def transmit( - %{conn: conn, pool: pool, tags: tags}, + %{conn: %{transport: :uds, socket_path: path}, tags: tags}, type, key, value, options ) when (is_binary(key) or is_list(key)) and is_list(options) do - sample_rate = Keyword.get(options, :sample_rate) + if should_send?(options) do + options = put_global_tags(options, tags) + + case Statix.ConnTracker.get(path) do + {:ok, conn} -> + Conn.transmit(conn, type, key, to_string(value), options) + + {:error, :not_found} -> + {:error, :socket_not_found} + end + else + :ok + end + end - if is_nil(sample_rate) or sample_rate >= :rand.uniform() do + def transmit( + %{conn: conn, pool: pool, tags: tags}, + type, + key, + value, + options + ) + when (is_binary(key) or is_list(key)) and is_list(options) do + if should_send?(options) do options = put_global_tags(options, tags) %{conn | sock: pick_name(pool)} @@ -403,6 +453,11 @@ defmodule Statix do end end + # Takes first :sample_rate occurrence (standard keyword list behavior) + defp should_send?([]), do: true + defp should_send?([{:sample_rate, rate} | _]), do: rate >= :rand.uniform() + defp should_send?([_ | rest]), do: should_send?(rest) + defp pick_name([name]), do: name defp pick_name(pool), do: Enum.random(pool) @@ -420,13 +475,29 @@ defmodule Statix do env |> Keyword.get_values(:tags) |> Enum.concat() end) - %{ + config = %{ prefix: build_prefix(env, overrides), host: Keyword.get(options, :host, "127.0.0.1"), port: Keyword.get(options, :port, 8125), + socket_path: Keyword.get(options, :socket_path), pool_size: Keyword.get(options, :pool_size, 1), tags: tags } + + # Warn if both socket_path and host/port are specified + if config.socket_path do + has_custom_host = Keyword.has_key?(options, :host) + has_custom_port = Keyword.has_key?(options, :port) + + if has_custom_host or has_custom_port do + Logger.warning( + "Both socket_path and host/port specified for #{inspect(module)}. " <> + "Using socket_path=#{config.socket_path}, ignoring host/port." + ) + end + end + + config end defp build_prefix(env, overrides) do diff --git a/lib/statix/application.ex b/lib/statix/application.ex new file mode 100644 index 0000000..e0721fd --- /dev/null +++ b/lib/statix/application.ex @@ -0,0 +1,13 @@ +defmodule Statix.Application do + @moduledoc false + + use Application + + def start(_type, _args) do + children = [ + Statix.ConnTracker + ] + + Supervisor.start_link(children, strategy: :one_for_one, name: Statix.Supervisor) + end +end diff --git a/lib/statix/conn.ex b/lib/statix/conn.ex index 36d68b3..1604e55 100644 --- a/lib/statix/conn.ex +++ b/lib/statix/conn.ex @@ -1,7 +1,11 @@ defmodule Statix.Conn do @moduledoc false - defstruct [:sock, :address, :port, :prefix] + # sock field holds different types depending on state and transport: + # - UDP: port (after open/1) or atom for process name + # - UDS: {:socket_path, path} before open/1, socket reference after + # socket_path field preserves the UDS path even after opening + defstruct [:sock, :address, :port, :prefix, :transport, :socket_path] alias Statix.Packet @@ -14,7 +18,7 @@ defmodule Statix.Conn do def new(host, port, prefix) when is_list(host) or is_tuple(host) do case :inet.getaddr(host, :inet) do {:ok, address} -> - %__MODULE__{address: address, port: port, prefix: prefix} + %__MODULE__{address: address, port: port, prefix: prefix, transport: :udp} {:error, reason} -> raise( @@ -24,11 +28,38 @@ defmodule Statix.Conn do end end - def open(%__MODULE__{} = conn) do + def new(socket_path, prefix) when is_binary(socket_path) do + %__MODULE__{ + prefix: prefix, + transport: :uds, + sock: {:socket_path, socket_path}, + socket_path: socket_path + } + end + + def open(%__MODULE__{transport: :udp} = conn) do {:ok, sock} = :gen_udp.open(0, active: false) %__MODULE__{conn | sock: sock} end + def open(%__MODULE__{transport: :uds, sock: {:socket_path, path}} = conn) do + unless Code.ensure_loaded?(:socket) do + raise "Unix domain socket support requires OTP 22+. Current OTP version does not support :socket module." + end + + {:ok, sock} = :socket.open(:local, :dgram, :default) + path_addr = %{family: :local, path: String.to_charlist(path)} + + case :socket.connect(sock, path_addr) do + :ok -> + %__MODULE__{conn | sock: sock} + + {:error, reason} -> + :socket.close(sock) + raise "Failed to connect to Unix domain socket at #{path}: #{inspect(reason)}" + end + end + def transmit(%__MODULE__{sock: sock, prefix: prefix} = conn, type, key, val, options) when is_binary(val) and is_list(options) do result = @@ -47,7 +78,12 @@ defmodule Statix.Conn do result end - defp transmit(packet, %__MODULE__{address: address, port: port, sock: sock_name}) do + defp transmit(packet, %__MODULE__{ + transport: :udp, + address: address, + port: port, + sock: sock_name + }) do sock = Process.whereis(sock_name) if sock do @@ -56,4 +92,13 @@ defmodule Statix.Conn do {:error, :port_closed} end end + + defp transmit(packet, %__MODULE__{transport: :uds, sock: sock}) do + # UDS DGRAM sockets send atomically + :socket.send(sock, packet) + end + + defp transmit(_packet, %__MODULE__{transport: transport}) do + raise ArgumentError, "unsupported transport type: #{inspect(transport)}" + end end diff --git a/lib/statix/conn_tracker.ex b/lib/statix/conn_tracker.ex new file mode 100644 index 0000000..daf8db7 --- /dev/null +++ b/lib/statix/conn_tracker.ex @@ -0,0 +1,86 @@ +defmodule Statix.ConnTracker do + @moduledoc false + + use GenServer + + alias Statix.Conn + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + @impl true + def init(_opts) do + table = + :ets.new(:statix_conn_tracker, [:set, :protected, :named_table, read_concurrency: true]) + + {:ok, table} + end + + @doc """ + Stores a list of connections for the given key, replacing any existing connections. + + Closes old connections before storing new ones to prevent resource leaks. + Typically called when establishing a UDS connection pool with pool_size > 1. + """ + @spec set(key :: term(), connections :: [Conn.t()]) :: :ok + def set(key, connections) do + GenServer.call(__MODULE__, {:set, key, connections}) + end + + @doc """ + Retrieves a random connection for the given key. + + Returns `{:ok, conn}` with a randomly selected connection from the pool, + or `{:error, :not_found}` if no connections exist for the key. + """ + @spec get(key :: term()) :: {:ok, Conn.t()} | {:error, :not_found} + def get(key) do + case :ets.lookup(:statix_conn_tracker, key) do + [{^key, connections}] -> + random_connection = Enum.random(connections) + {:ok, random_connection} + + [] -> + {:error, :not_found} + end + end + + @impl true + def handle_call({:set, key, connections}, _from, state) do + # Close old connections before replacing them + case :ets.lookup(state, key) do + [{^key, old_connections}] -> + close_connections(old_connections) + + [] -> + :ok + end + + :ets.insert(state, {key, connections}) + {:reply, :ok, state} + end + + @impl true + def terminate(_reason, table) do + # Close all sockets before table is destroyed + :ets.foldl( + fn {_path, connections}, acc -> + close_connections(connections) + acc + end, + nil, + table + ) + + :ok + end + + defp close_connections(connections) do + Enum.each(connections, fn conn -> + if conn.transport == :uds and is_reference(conn.sock) do + :socket.close(conn.sock) + end + end) + end +end diff --git a/mix.exs b/mix.exs index 86e2721..a7a2b35 100644 --- a/mix.exs +++ b/mix.exs @@ -22,7 +22,10 @@ defmodule Statix.Mixfile do end def application() do - [extra_applications: [:logger]] + [ + mod: {Statix.Application, []}, + extra_applications: [:logger] + ] end defp description() do diff --git a/test/statix/uds_test.exs b/test/statix/uds_test.exs new file mode 100644 index 0000000..8b92354 --- /dev/null +++ b/test/statix/uds_test.exs @@ -0,0 +1,172 @@ +Code.require_file("../support/uds_test_server.exs", __DIR__) + +defmodule Statix.UDSTest do + use ExUnit.Case + + @moduletag :uds + + defmodule TestStatix do + use Statix, runtime_config: true + end + + setup_all do + socket_path = "/tmp/statix_test_#{:erlang.unique_integer([:positive])}.sock" + {:ok, _} = Statix.UDSTestServer.start_link(socket_path, __MODULE__.Server) + + on_exit(fn -> + File.rm(socket_path) + end) + + {:ok, socket_path: socket_path} + end + + setup context do + Statix.UDSTestServer.setup(__MODULE__.Server) + TestStatix.connect(socket_path: context[:socket_path]) + :ok + end + + test "increment via UDS", _context do + TestStatix.increment("sample") + assert_receive {:test_server, _, "sample:1|c"} + + TestStatix.increment("sample", 2) + assert_receive {:test_server, _, "sample:2|c"} + + TestStatix.increment("sample", 3, tags: ["foo:bar", "baz"]) + assert_receive {:test_server, _, "sample:3|c|#foo:bar,baz"} + end + + test "decrement via UDS", _context do + TestStatix.decrement("sample") + assert_receive {:test_server, _, "sample:-1|c"} + + TestStatix.decrement("sample", 2) + assert_receive {:test_server, _, "sample:-2|c"} + + TestStatix.decrement("sample", 3, tags: ["foo:bar", "baz"]) + assert_receive {:test_server, _, "sample:-3|c|#foo:bar,baz"} + end + + test "gauge via UDS", _context do + TestStatix.gauge("sample", 2) + assert_receive {:test_server, _, "sample:2|g"} + + TestStatix.gauge("sample", 2.1) + assert_receive {:test_server, _, "sample:2.1|g"} + + TestStatix.gauge("sample", 3, tags: ["foo:bar", "baz"]) + assert_receive {:test_server, _, "sample:3|g|#foo:bar,baz"} + end + + test "histogram via UDS", _context do + TestStatix.histogram("sample", 2) + assert_receive {:test_server, _, "sample:2|h"} + + TestStatix.histogram("sample", 2.1) + assert_receive {:test_server, _, "sample:2.1|h"} + + TestStatix.histogram("sample", 3, tags: ["foo:bar", "baz"]) + assert_receive {:test_server, _, "sample:3|h|#foo:bar,baz"} + end + + test "timing via UDS", _context do + TestStatix.timing("sample", 2) + assert_receive {:test_server, _, "sample:2|ms"} + + TestStatix.timing("sample", 2.1) + assert_receive {:test_server, _, "sample:2.1|ms"} + + TestStatix.timing("sample", 3, tags: ["foo:bar", "baz"]) + assert_receive {:test_server, _, "sample:3|ms|#foo:bar,baz"} + end + + test "set via UDS", _context do + TestStatix.set("sample", "user1") + assert_receive {:test_server, _, "sample:user1|s"} + + TestStatix.set("sample", "user2", tags: ["foo:bar"]) + assert_receive {:test_server, _, "sample:user2|s|#foo:bar"} + end + + test "measure via UDS", _context do + result = TestStatix.measure("sample", [], fn -> :measured end) + assert result == :measured + assert_receive {:test_server, _, <<"sample:", _::binary>>} + end + + test "sample rate via UDS", _context do + TestStatix.increment("sample", 1, sample_rate: 1.0) + assert_receive {:test_server, _, "sample:1|c|@1.0"} + + TestStatix.increment("sample", 1, sample_rate: 0.0) + refute_received {:test_server, _, _} + end + + test "large packet over 1024 bytes via UDS maintains atomicity", _context do + # Create tags that will result in a packet > 1024 bytes + # Each tag is roughly 30 chars, so ~35 tags = ~1050 bytes total packet + tags = + for i <- 1..35 do + "very_long_tag_name_#{i}:very_long_tag_value_#{i}" + end + + TestStatix.increment("sample.with.long.metric.name", 1, tags: tags) + + # Verify we receive the complete packet atomically (all or nothing) + assert_receive {:test_server, _, packet}, 1000 + + # Verify packet structure is intact and complete + assert packet =~ ~r/^sample\.with\.long\.metric\.name:1\|c\|#/ + assert String.contains?(packet, "very_long_tag_name_1:very_long_tag_value_1") + assert String.contains?(packet, "very_long_tag_name_35:very_long_tag_value_35") + + # Verify packet size exceeds 1024 bytes + assert byte_size(packet) > 1024 + end + + test "very large packet over 4096 bytes via UDS maintains atomicity", _context do + # ~140 tags at 30 chars each = ~4200 bytes total + tags = + for i <- 1..140 do + "very_long_tag_name_#{i}:very_long_tag_value_#{i}" + end + + TestStatix.gauge("sample.metric.with.many.tags", 12345, tags: tags) + + assert_receive {:test_server, _, packet}, 1000 + + assert packet =~ ~r/^sample\.metric\.with\.many\.tags:12345\|g\|#/ + assert String.contains?(packet, "very_long_tag_name_1:very_long_tag_value_1") + assert String.contains?(packet, "very_long_tag_name_140:very_long_tag_value_140") + assert byte_size(packet) > 4096 + + # Verify atomicity: all 140 tags present (no truncation) + tag_count = packet |> String.split(",") |> length() + assert tag_count == 140 + end + + test "pooling with pool_size > 1 distributes traffic", context do + TestStatix.connect(socket_path: context[:socket_path], pool_size: 3) + + [{_, connections}] = :ets.lookup(:statix_conn_tracker, context[:socket_path]) + assert length(connections) == 3 + + socket_refs = + for i <- 1..1000 do + {:ok, conn} = Statix.ConnTracker.get(context[:socket_path]) + TestStatix.increment("pooled.metric") + assert_receive {:test_server, _, _packet} + # Return the socket reference + conn.sock + end + + socket_counts = Enum.frequencies(socket_refs) + assert map_size(socket_counts) == 3 + + Enum.each(socket_counts, fn {_sock, count} -> + assert count > 250 and count < 417, + "Expected roughly 333 uses per socket, got: #{inspect(socket_counts)}" + end) + end +end diff --git a/test/support/uds_test_server.exs b/test/support/uds_test_server.exs new file mode 100644 index 0000000..7f14085 --- /dev/null +++ b/test/support/uds_test_server.exs @@ -0,0 +1,99 @@ +defmodule Statix.UDSTestServer do + use GenServer + + def start_link(socket_path, test_module) do + GenServer.start_link(__MODULE__, socket_path, name: test_module) + end + + @impl true + def init(socket_path) do + unless Code.ensure_loaded?(:socket) do + {:stop, :socket_not_available} + else + {:ok, sock} = :socket.open(:local, :dgram, :default) + + addr = %{family: :local, path: String.to_charlist(socket_path)} + :ok = :socket.bind(sock, addr) + :ok = :socket.setopt(sock, :otp, :rcvbuf, 65536) + + case :socket.recvfrom(sock, 0, [], :nowait) do + {:ok, _} = result -> + send(self(), {:socket_data, result}) + + {:select, _select_info} -> + :ok + + {:error, reason} -> + {:stop, {:socket_error, reason}} + end + + {:ok, %{socket: sock, socket_path: socket_path, test: nil}} + end + end + + @impl true + def handle_call({:set_current_test, current_test}, _from, %{test: test} = state) do + if is_nil(test) or is_nil(current_test) do + {:reply, :ok, %{state | test: current_test}} + else + {:reply, :error, state} + end + end + + @impl true + def handle_info( + {:"$socket", socket, :select, _select_info}, + %{socket: socket, test: test} = state + ) do + case :socket.recvfrom(socket, 0, [], :nowait) do + {:ok, {_source, packet}} -> + if test, do: send(test, {:test_server, %{socket: socket}, packet}) + start_receive(socket) + {:noreply, state} + + {:select, _select_info} -> + {:noreply, state} + + {:error, reason} -> + IO.puts("UDS receive error: #{inspect(reason)}") + {:noreply, state} + end + end + + @impl true + def handle_info({:socket_data, {:ok, {_source, packet}}}, %{socket: socket, test: test} = state) do + if test, do: send(test, {:test_server, %{socket: socket}, packet}) + + start_receive(socket) + {:noreply, state} + end + + defp start_receive(socket) do + case :socket.recvfrom(socket, 0, [], :nowait) do + {:ok, {_source, _packet}} = result -> + send(self(), {:socket_data, result}) + + {:select, _select_info} -> + :ok + + {:error, _reason} -> + :ok + end + end + + @impl true + def terminate(_reason, %{socket: socket, socket_path: path}) do + :socket.close(socket) + _ = File.rm(path) + :ok + end + + def setup(test_module) do + :ok = set_current_test(test_module, self()) + ExUnit.Callbacks.on_exit(fn -> set_current_test(test_module, nil) end) + end + + defp set_current_test(test_module, test) do + GenServer.call(test_module, {:set_current_test, test}) + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs index 77da001..f22f7df 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1,6 +1,14 @@ Code.require_file("support/test_server.exs", __DIR__) -ExUnit.start() +# Exclude UDS tests on OTP < 22 (when :socket module is not available) +exclude = + if Code.ensure_loaded?(:socket) do + [] + else + [:uds] + end + +ExUnit.start(exclude: exclude) defmodule Statix.TestCase do use ExUnit.CaseTemplate