Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 60 additions & 6 deletions lib/statix.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,24 @@ defmodule Statix do
config :statix, MyApp.Statix,
port: 8123

Unix domain sockets are also supported for local connections:

config :statix, MyApp.Statix,
socket_path: "/var/run/statsd.sock"

The following is a list of all the supported options:

* `:prefix` - (binary) all metrics sent to the StatsD-compatible
server through the configured Statix connection will be prefixed with the
value of this option. By default this option is not present.
* `:host` - (binary) the host where the StatsD-compatible server is running.
Defaults to `"127.0.0.1"`.
Defaults to `"127.0.0.1"`. Ignored if `:socket_path` is set.
* `:port` - (integer) the port (on `:host`) where the StatsD-compatible
server is running. Defaults to `8125`.
server is running. Defaults to `8125`. Ignored if `:socket_path` is set.
* `:socket_path` - (binary) path to a Unix domain socket for the StatsD-compatible
server. When this option is present, `:host` and `:port` are ignored and the
connection uses Unix domain sockets instead of UDP. Requires OTP 22+.
By default this option is not present.
* `:tags` - ([binary]) a list of global tags that will be sent with all
metrics. By default this option is not present.
See the "Tags" section for more information.
Expand Down Expand Up @@ -359,7 +368,14 @@ defmodule Statix do
@doc false
def new(module, options) do
config = get_config(module, options)
conn = Conn.new(config.host, config.port, config.prefix)

# Determine transport based on socket_path presence
conn =
if config.socket_path do
Conn.new(config.socket_path, config.prefix)
else
Conn.new(config.host, config.port, config.prefix)
end

%__MODULE__{
conn: conn,
Expand All @@ -375,6 +391,17 @@ defmodule Statix do
end

@doc false
def open(%__MODULE__{conn: %{transport: :uds, sock: {:socket_path, path}} = conn, pool: pool}) do
# UDS sockets are socket references (not ports), so they cannot be registered as process names.
# Instead, store them in ConnTracker's ETS table.
connections =
Enum.map(pool, fn _name ->
Conn.open(conn)
end)

Statix.ConnTracker.set(path, connections)
end

def open(%__MODULE__{conn: conn, pool: pool}) do
Enum.each(pool, fn name ->
%{sock: sock} = Conn.open(conn)
Expand All @@ -384,16 +411,37 @@ defmodule Statix do

@doc false
def transmit(
%{conn: conn, pool: pool, tags: tags},
%{conn: %{transport: :uds, socket_path: path}, tags: tags},
type,
key,
value,
options
)
when (is_binary(key) or is_list(key)) and is_list(options) do
sample_rate = Keyword.get(options, :sample_rate)
if should_send?(options) do
options = put_global_tags(options, tags)

case Statix.ConnTracker.get(path) do
{:ok, conn} ->
Conn.transmit(conn, type, key, to_string(value), options)

{:error, :not_found} ->
{:error, :socket_not_found}
end
else
:ok
end
end

if is_nil(sample_rate) or sample_rate >= :rand.uniform() do
def transmit(
%{conn: conn, pool: pool, tags: tags},
type,
key,
value,
options
)
when (is_binary(key) or is_list(key)) and is_list(options) do
if should_send?(options) do
options = put_global_tags(options, tags)

%{conn | sock: pick_name(pool)}
Expand All @@ -403,6 +451,11 @@ defmodule Statix do
end
end

# Takes first :sample_rate occurrence (standard keyword list behavior)
defp should_send?([]), do: true
defp should_send?([{:sample_rate, rate} | _]), do: rate >= :rand.uniform()
defp should_send?([_ | rest]), do: should_send?(rest)

defp pick_name([name]), do: name
defp pick_name(pool), do: Enum.random(pool)

Expand All @@ -424,6 +477,7 @@ defmodule Statix do
prefix: build_prefix(env, overrides),
host: Keyword.get(options, :host, "127.0.0.1"),
port: Keyword.get(options, :port, 8125),
socket_path: Keyword.get(options, :socket_path),
pool_size: Keyword.get(options, :pool_size, 1),
tags: tags
}
Expand Down
13 changes: 13 additions & 0 deletions lib/statix/application.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
defmodule Statix.Application do
@moduledoc false

use Application

def start(_type, _args) do
children = [
Statix.ConnTracker
]

Supervisor.start_link(children, strategy: :one_for_one, name: Statix.Supervisor)
end
end
53 changes: 49 additions & 4 deletions lib/statix/conn.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
defmodule Statix.Conn do
@moduledoc false

defstruct [:sock, :address, :port, :prefix]
# sock field holds different types depending on state and transport:
# - UDP: port (after open/1) or atom for process name
# - UDS: {:socket_path, path} before open/1, socket reference after
# socket_path field preserves the UDS path even after opening
defstruct [:sock, :address, :port, :prefix, :transport, :socket_path]

alias Statix.Packet

Expand All @@ -14,7 +18,7 @@ defmodule Statix.Conn do
def new(host, port, prefix) when is_list(host) or is_tuple(host) do
case :inet.getaddr(host, :inet) do
{:ok, address} ->
%__MODULE__{address: address, port: port, prefix: prefix}
%__MODULE__{address: address, port: port, prefix: prefix, transport: :udp}

{:error, reason} ->
raise(
Expand All @@ -24,11 +28,38 @@ defmodule Statix.Conn do
end
end

def open(%__MODULE__{} = conn) do
def new(socket_path, prefix) when is_binary(socket_path) do
%__MODULE__{
prefix: prefix,
transport: :uds,
sock: {:socket_path, socket_path},
socket_path: socket_path
}
end

def open(%__MODULE__{transport: :udp} = conn) do
{:ok, sock} = :gen_udp.open(0, active: false)
%__MODULE__{conn | sock: sock}
end

def open(%__MODULE__{transport: :uds, sock: {:socket_path, path}} = conn) do
unless Code.ensure_loaded?(:socket) do
raise "Unix domain socket support requires OTP 22+. Current OTP version does not support :socket module."
end

{:ok, sock} = :socket.open(:local, :dgram, :default)
path_addr = %{family: :local, path: String.to_charlist(path)}

case :socket.connect(sock, path_addr) do
:ok ->
%__MODULE__{conn | sock: sock}

{:error, reason} ->
:socket.close(sock)
raise "Failed to connect to Unix domain socket at #{path}: #{inspect(reason)}"
end
end

def transmit(%__MODULE__{sock: sock, prefix: prefix} = conn, type, key, val, options)
when is_binary(val) and is_list(options) do
result =
Expand All @@ -47,7 +78,12 @@ defmodule Statix.Conn do
result
end

defp transmit(packet, %__MODULE__{address: address, port: port, sock: sock_name}) do
defp transmit(packet, %__MODULE__{
transport: :udp,
address: address,
port: port,
sock: sock_name
}) do
sock = Process.whereis(sock_name)

if sock do
Expand All @@ -56,4 +92,13 @@ defmodule Statix.Conn do
{:error, :port_closed}
end
end

defp transmit(packet, %__MODULE__{transport: :uds, sock: sock}) do
# UDS DGRAM sockets send atomically
:socket.send(sock, packet)
end

defp transmit(_packet, %__MODULE__{transport: transport}) do
raise ArgumentError, "unsupported transport type: #{inspect(transport)}"
end
end
70 changes: 70 additions & 0 deletions lib/statix/conn_tracker.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
defmodule Statix.ConnTracker do
@moduledoc false

use GenServer

def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

@impl true
def init(_opts) do
table =
:ets.new(:statix_conn_tracker, [:set, :protected, :named_table, read_concurrency: true])

{:ok, table}
end

def set(key, connections) do
GenServer.call(__MODULE__, {:set, key, connections})
end

def get(key) do
case :ets.lookup(:statix_conn_tracker, key) do
[{^key, connections}] ->
random_connection = Enum.random(connections)
{:ok, random_connection}

[] ->
{:error, :not_found}
end
end

@impl true
def handle_call({:set, key, connections}, _from, state) do
# Close old connections before replacing them
case :ets.lookup(state, key) do
[{^key, old_connections}] ->
close_connections(old_connections)

[] ->
:ok
end

:ets.insert(state, {key, connections})
{:reply, :ok, state}
end

@impl true
def terminate(_reason, table) do
# Close all sockets before table is destroyed
:ets.foldl(
fn {_path, connections}, acc ->
close_connections(connections)
acc
end,
nil,
table
)

:ok
end

defp close_connections(connections) do
Enum.each(connections, fn conn ->
if conn.transport == :uds and is_reference(conn.sock) do
:socket.close(conn.sock)
end
end)
end
end
5 changes: 4 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ defmodule Statix.Mixfile do
end

def application() do
[extra_applications: [:logger]]
[
mod: {Statix.Application, []},
extra_applications: [:logger]
]
end

defp description() do
Expand Down
Loading
Loading