Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 77 additions & 6 deletions lib/statix.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,24 @@ defmodule Statix do
config :statix, MyApp.Statix,
port: 8123

Unix domain sockets are also supported for local connections:

config :statix, MyApp.Statix,
socket_path: "/var/run/statsd.sock"

The following is a list of all the supported options:

* `:prefix` - (binary) all metrics sent to the StatsD-compatible
server through the configured Statix connection will be prefixed with the
value of this option. By default this option is not present.
* `:host` - (binary) the host where the StatsD-compatible server is running.
Defaults to `"127.0.0.1"`.
Defaults to `"127.0.0.1"`. Ignored if `:socket_path` is set.
* `:port` - (integer) the port (on `:host`) where the StatsD-compatible
server is running. Defaults to `8125`.
server is running. Defaults to `8125`. Ignored if `:socket_path` is set.
* `:socket_path` - (binary) path to a Unix domain socket for the StatsD-compatible
server. When this option is present, `:host` and `:port` are ignored and the
connection uses Unix domain sockets instead of UDP. Requires OTP 22+.
By default this option is not present.
* `:tags` - ([binary]) a list of global tags that will be sent with all
metrics. By default this option is not present.
See the "Tags" section for more information.
Expand Down Expand Up @@ -359,7 +368,14 @@ defmodule Statix do
@doc false
def new(module, options) do
config = get_config(module, options)
conn = Conn.new(config.host, config.port, config.prefix)

# Determine transport based on socket_path presence
conn =
if config.socket_path do
Conn.new(config.socket_path, config.prefix)
else
Conn.new(config.host, config.port, config.prefix)
end

%__MODULE__{
conn: conn,
Expand All @@ -375,6 +391,31 @@ defmodule Statix do
end

@doc false
def open(%__MODULE__{conn: %{transport: :uds} = conn, pool: pool}) do
# UDS sockets are socket references (not ports), so they cannot be registered as process names.
# Instead, store them in ETS for lookup without message passing overhead.
table_name = :"#{hd(pool)}_uds_sockets"

# Make sure the table exists, handle concurrent open/1 calls
case :ets.whereis(table_name) do
:undefined ->
try do
:ets.new(table_name, [:named_table, :public, :set, read_concurrency: true])
rescue
ArgumentError -> :ok
end

_ ->
:ok
end

# Open sockets and store in ETS pool
Enum.each(pool, fn name ->
opened_conn = Conn.open(conn)
:ets.insert(table_name, {name, opened_conn})
end)
end

def open(%__MODULE__{conn: conn, pool: pool}) do
Enum.each(pool, fn name ->
%{sock: sock} = Conn.open(conn)
Expand All @@ -384,16 +425,40 @@ defmodule Statix do

@doc false
def transmit(
%{conn: conn, pool: pool, tags: tags},
%{conn: %{transport: :uds}, pool: pool, tags: tags},
type,
key,
value,
options
)
when (is_binary(key) or is_list(key)) and is_list(options) do
sample_rate = Keyword.get(options, :sample_rate)
if should_send?(options) do
options = put_global_tags(options, tags)

name = pick_name(pool)
table_name = :"#{hd(pool)}_uds_sockets"

if is_nil(sample_rate) or sample_rate >= :rand.uniform() do
case :ets.lookup(table_name, name) do
[{^name, conn}] ->
Conn.transmit(conn, type, key, to_string(value), options)

[] ->
{:error, :socket_not_found}
end
else
:ok
end
end

def transmit(
%{conn: conn, pool: pool, tags: tags},
type,
key,
value,
options
)
when (is_binary(key) or is_list(key)) and is_list(options) do
if should_send?(options) do
options = put_global_tags(options, tags)

%{conn | sock: pick_name(pool)}
Expand All @@ -403,6 +468,11 @@ defmodule Statix do
end
end

# Takes first :sample_rate occurrence (standard keyword list behavior)
defp should_send?([]), do: true
defp should_send?([{:sample_rate, rate} | _]), do: rate >= :rand.uniform()
defp should_send?([_ | rest]), do: should_send?(rest)

defp pick_name([name]), do: name
defp pick_name(pool), do: Enum.random(pool)

Expand All @@ -424,6 +494,7 @@ defmodule Statix do
prefix: build_prefix(env, overrides),
host: Keyword.get(options, :host, "127.0.0.1"),
port: Keyword.get(options, :port, 8125),
socket_path: Keyword.get(options, :socket_path),
pool_size: Keyword.get(options, :pool_size, 1),
tags: tags
}
Expand Down
47 changes: 43 additions & 4 deletions lib/statix/conn.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
defmodule Statix.Conn do
@moduledoc false

defstruct [:sock, :address, :port, :prefix]
# sock field holds different types depending on state and transport:
# - UDP: port (after open/1) or atom for process name
# - UDS: {:socket_path, path} before open/1, socket reference after
defstruct [:sock, :address, :port, :prefix, :transport]

alias Statix.Packet

Expand All @@ -14,7 +17,7 @@ defmodule Statix.Conn do
def new(host, port, prefix) when is_list(host) or is_tuple(host) do
case :inet.getaddr(host, :inet) do
{:ok, address} ->
%__MODULE__{address: address, port: port, prefix: prefix}
%__MODULE__{address: address, port: port, prefix: prefix, transport: :udp}

{:error, reason} ->
raise(
Expand All @@ -24,11 +27,33 @@ defmodule Statix.Conn do
end
end

def open(%__MODULE__{} = conn) do
def new(socket_path, prefix) when is_binary(socket_path) do
%__MODULE__{prefix: prefix, transport: :uds, sock: {:socket_path, socket_path}}
end

def open(%__MODULE__{transport: :udp} = conn) do
{:ok, sock} = :gen_udp.open(0, active: false)
%__MODULE__{conn | sock: sock}
end

def open(%__MODULE__{transport: :uds, sock: {:socket_path, path}} = conn) do
unless Code.ensure_loaded?(:socket) do
raise "Unix domain socket support requires OTP 22+. Current OTP version does not support :socket module."
end

{:ok, sock} = :socket.open(:local, :dgram, :default)
path_addr = %{family: :local, path: String.to_charlist(path)}

case :socket.connect(sock, path_addr) do
:ok ->
%__MODULE__{conn | sock: sock}

{:error, reason} ->
:socket.close(sock)
raise "Failed to connect to Unix domain socket at #{path}: #{inspect(reason)}"
end
end

def transmit(%__MODULE__{sock: sock, prefix: prefix} = conn, type, key, val, options)
when is_binary(val) and is_list(options) do
result =
Expand All @@ -47,7 +72,12 @@ defmodule Statix.Conn do
result
end

defp transmit(packet, %__MODULE__{address: address, port: port, sock: sock_name}) do
defp transmit(packet, %__MODULE__{
transport: :udp,
address: address,
port: port,
sock: sock_name
}) do
sock = Process.whereis(sock_name)

if sock do
Expand All @@ -56,4 +86,13 @@ defmodule Statix.Conn do
{:error, :port_closed}
end
end

defp transmit(packet, %__MODULE__{transport: :uds, sock: sock}) do
# UDS DGRAM sockets send atomically
:socket.send(sock, packet)
end

defp transmit(_packet, %__MODULE__{transport: transport}) do
raise ArgumentError, "unsupported transport type: #{inspect(transport)}"
end
end
148 changes: 148 additions & 0 deletions test/statix/uds_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
Code.require_file("../support/uds_test_server.exs", __DIR__)

defmodule Statix.UDSTest do
use ExUnit.Case

@moduletag :uds

defmodule TestStatix do
use Statix, runtime_config: true
end

setup_all do
socket_path = "/tmp/statix_test_#{:erlang.unique_integer([:positive])}.sock"
{:ok, _} = Statix.UDSTestServer.start_link(socket_path, __MODULE__.Server)

on_exit(fn ->
File.rm(socket_path)
end)

{:ok, socket_path: socket_path}
end

setup context do
Statix.UDSTestServer.setup(__MODULE__.Server)
TestStatix.connect(socket_path: context[:socket_path])
:ok
end

test "increment via UDS", _context do
TestStatix.increment("sample")
assert_receive {:test_server, _, "sample:1|c"}

TestStatix.increment("sample", 2)
assert_receive {:test_server, _, "sample:2|c"}

TestStatix.increment("sample", 3, tags: ["foo:bar", "baz"])
assert_receive {:test_server, _, "sample:3|c|#foo:bar,baz"}
end

test "decrement via UDS", _context do
TestStatix.decrement("sample")
assert_receive {:test_server, _, "sample:-1|c"}

TestStatix.decrement("sample", 2)
assert_receive {:test_server, _, "sample:-2|c"}

TestStatix.decrement("sample", 3, tags: ["foo:bar", "baz"])
assert_receive {:test_server, _, "sample:-3|c|#foo:bar,baz"}
end

test "gauge via UDS", _context do
TestStatix.gauge("sample", 2)
assert_receive {:test_server, _, "sample:2|g"}

TestStatix.gauge("sample", 2.1)
assert_receive {:test_server, _, "sample:2.1|g"}

TestStatix.gauge("sample", 3, tags: ["foo:bar", "baz"])
assert_receive {:test_server, _, "sample:3|g|#foo:bar,baz"}
end

test "histogram via UDS", _context do
TestStatix.histogram("sample", 2)
assert_receive {:test_server, _, "sample:2|h"}

TestStatix.histogram("sample", 2.1)
assert_receive {:test_server, _, "sample:2.1|h"}

TestStatix.histogram("sample", 3, tags: ["foo:bar", "baz"])
assert_receive {:test_server, _, "sample:3|h|#foo:bar,baz"}
end

test "timing via UDS", _context do
TestStatix.timing("sample", 2)
assert_receive {:test_server, _, "sample:2|ms"}

TestStatix.timing("sample", 2.1)
assert_receive {:test_server, _, "sample:2.1|ms"}

TestStatix.timing("sample", 3, tags: ["foo:bar", "baz"])
assert_receive {:test_server, _, "sample:3|ms|#foo:bar,baz"}
end

test "set via UDS", _context do
TestStatix.set("sample", "user1")
assert_receive {:test_server, _, "sample:user1|s"}

TestStatix.set("sample", "user2", tags: ["foo:bar"])
assert_receive {:test_server, _, "sample:user2|s|#foo:bar"}
end

test "measure via UDS", _context do
result = TestStatix.measure("sample", [], fn -> :measured end)
assert result == :measured
assert_receive {:test_server, _, <<"sample:", _::binary>>}
end

test "sample rate via UDS", _context do
TestStatix.increment("sample", 1, sample_rate: 1.0)
assert_receive {:test_server, _, "sample:1|c|@1.0"}

TestStatix.increment("sample", 1, sample_rate: 0.0)
refute_received {:test_server, _, _}
end

test "large packet over 1024 bytes via UDS maintains atomicity", _context do
# Create tags that will result in a packet > 1024 bytes
# Each tag is roughly 30 chars, so ~35 tags = ~1050 bytes total packet
tags =
for i <- 1..35 do
"very_long_tag_name_#{i}:very_long_tag_value_#{i}"
end

TestStatix.increment("sample.with.long.metric.name", 1, tags: tags)

# Verify we receive the complete packet atomically (all or nothing)
assert_receive {:test_server, _, packet}, 1000

# Verify packet structure is intact and complete
assert packet =~ ~r/^sample\.with\.long\.metric\.name:1\|c\|#/
assert String.contains?(packet, "very_long_tag_name_1:very_long_tag_value_1")
assert String.contains?(packet, "very_long_tag_name_35:very_long_tag_value_35")

# Verify packet size exceeds 1024 bytes
assert byte_size(packet) > 1024
end

test "very large packet over 4096 bytes via UDS maintains atomicity", _context do
# ~140 tags at 30 chars each = ~4200 bytes total
tags =
for i <- 1..140 do
"very_long_tag_name_#{i}:very_long_tag_value_#{i}"
end

TestStatix.gauge("sample.metric.with.many.tags", 12345, tags: tags)

assert_receive {:test_server, _, packet}, 1000

assert packet =~ ~r/^sample\.metric\.with\.many\.tags:12345\|g\|#/
assert String.contains?(packet, "very_long_tag_name_1:very_long_tag_value_1")
assert String.contains?(packet, "very_long_tag_name_140:very_long_tag_value_140")
assert byte_size(packet) > 4096

# Verify atomicity: all 140 tags present (no truncation)
tag_count = packet |> String.split(",") |> length()
assert tag_count == 140
end
end
Loading
Loading