Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions lib/statix.ex
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,6 @@ defmodule Statix do
def new(module, options) do
config = get_config(module, options)

# Determine transport based on socket_path presence
conn =
if config.socket_path do
Conn.new(config.socket_path, config.prefix)
Expand All @@ -394,14 +393,12 @@ defmodule Statix do

@doc false
def open(%__MODULE__{conn: %{transport: :uds, sock: {:socket_path, path}} = conn, pool: pool}) do
# UDS sockets are socket references (not ports), so they cannot be registered as process names.
# Instead, store them in ConnTracker's ETS table.
connections =
Enum.map(pool, fn _name ->
Conn.open(conn)
end)

Statix.ConnTracker.set(path, connections)
Statix.ConnTracker.set(path, connections, conn_template: conn)
end

def open(%__MODULE__{conn: conn, pool: pool}) do
Expand All @@ -425,7 +422,14 @@ defmodule Statix do

case Statix.ConnTracker.get(path) do
{:ok, conn} ->
Conn.transmit(conn, type, key, to_string(value), options)
case Conn.transmit(conn, type, key, to_string(value), options) do
:ok ->
:ok

{:error, _reason} = error ->
Statix.ConnTracker.report_send_error(path)
error
end

{:error, :not_found} ->
{:error, :socket_not_found}
Expand Down Expand Up @@ -453,7 +457,6 @@ defmodule Statix do
end
end

# Takes first :sample_rate occurrence (standard keyword list behavior)
defp should_send?([]), do: true
defp should_send?([{:sample_rate, rate} | _]), do: rate >= :rand.uniform()
defp should_send?([_ | rest]), do: should_send?(rest)
Expand Down Expand Up @@ -484,7 +487,6 @@ defmodule Statix do
tags: tags
}

# Warn if both socket_path and host/port are specified
if config.socket_path do
has_custom_host = Keyword.has_key?(options, :host)
has_custom_port = Keyword.has_key?(options, :port)
Expand Down
6 changes: 2 additions & 4 deletions lib/statix/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ defmodule Statix.Application do

use Application

@impl true
def start(_type, _args) do
children = [
Statix.ConnTracker
]

children = [Statix.ConnTracker]
Supervisor.start_link(children, strategy: :one_for_one, name: Statix.Supervisor)
end
end
28 changes: 19 additions & 9 deletions lib/statix/conn.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ defmodule Statix.Conn do
# sock field holds different types depending on state and transport:
# - UDP: port (after open/1) or atom for process name
# - UDS: {:socket_path, path} before open/1, socket reference after
# socket_path field preserves the UDS path even after opening
defstruct [:sock, :address, :port, :prefix, :transport, :socket_path]

alias Statix.Packet
Expand Down Expand Up @@ -44,22 +43,33 @@ defmodule Statix.Conn do

def open(%__MODULE__{transport: :uds, sock: {:socket_path, path}} = conn) do
unless Code.ensure_loaded?(:socket) do
raise "Unix domain socket support requires OTP 22+. Current OTP version does not support :socket module."
raise "Unix domain socket support requires OTP 22+"
end

{:ok, sock} = :socket.open(:local, :dgram, :default)
path_addr = %{family: :local, path: String.to_charlist(path)}

case :socket.connect(sock, path_addr) do
:ok ->
%__MODULE__{conn | sock: sock}
case safe_open(conn) do
{:ok, opened} ->
opened

{:error, reason} ->
:socket.close(sock)
raise "Failed to connect to Unix domain socket at #{path}: #{inspect(reason)}"
end
end

def safe_open(%__MODULE__{transport: :uds, sock: {:socket_path, path}} = conn) do
with {:ok, sock} <- :socket.open(:local, :dgram, :default) do
path_addr = %{family: :local, path: String.to_charlist(path)}

case :socket.connect(sock, path_addr) do
:ok ->
{:ok, %__MODULE__{conn | sock: sock}}

{:error, reason} ->
:socket.close(sock)
{:error, reason}
end
end
end

def transmit(%__MODULE__{sock: sock, prefix: prefix} = conn, type, key, val, options)
when is_binary(val) and is_list(options) do
result =
Expand Down
176 changes: 143 additions & 33 deletions lib/statix/conn_tracker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ defmodule Statix.ConnTracker do

alias Statix.Conn

require Logger

@backoff_steps [1_000, 5_000, 30_000, 60_000, 120_000, 300_000]
@max_backoff_index length(@backoff_steps) - 1

def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
Expand All @@ -14,56 +19,146 @@ defmodule Statix.ConnTracker do
table =
:ets.new(:statix_conn_tracker, [:set, :protected, :named_table, read_concurrency: true])

{:ok, table}
{:ok, %{table: table, paths: %{}}}
end

@doc """
Stores a list of connections for the given key, replacing any existing connections.

Closes old connections before storing new ones to prevent resource leaks.
Typically called when establishing a UDS connection pool with pool_size > 1.
"""
@spec set(key :: term(), connections :: [Conn.t()]) :: :ok
def set(key, connections) do
GenServer.call(__MODULE__, {:set, key, connections})
@spec set(key :: term(), connections :: [Conn.t()], opts :: keyword()) :: :ok
def set(key, connections, opts \\ []) do
GenServer.call(__MODULE__, {:set, key, connections, opts})
end

@doc """
Retrieves a random connection for the given key.

Returns `{:ok, conn}` with a randomly selected connection from the pool,
or `{:error, :not_found}` if no connections exist for the key.
"""
@spec get(key :: term()) :: {:ok, Conn.t()} | {:error, :not_found}
def get(key) do
case :ets.lookup(:statix_conn_tracker, key) do
[{^key, connections}] ->
random_connection = Enum.random(connections)
{:ok, random_connection}
[{^key, [_ | _] = connections}] ->
{:ok, Enum.random(connections)}

[] ->
_ ->
{:error, :not_found}
end
end

@spec report_send_error(key :: term()) :: :ok
def report_send_error(key) do
GenServer.cast(__MODULE__, {:report_send_error, key})
end

@impl true
def handle_call({:set, key, connections}, _from, state) do
# Close old connections before replacing them
case :ets.lookup(state, key) do
[{^key, old_connections}] ->
close_connections(old_connections)

[] ->
:ok
def handle_call({:set, key, connections, opts}, _from, state) do
close_and_remove(state.table, key)
:ets.insert(state.table, {key, connections})

case get_in(state.paths, [key, :health]) do
%{timer_ref: ref} -> Process.cancel_timer(ref)
_ -> :ok
end

:ets.insert(state, {key, connections})
{:reply, :ok, state}
paths =
case Keyword.fetch(opts, :conn_template) do
{:ok, template} ->
Map.put(state.paths, key, %{
conn_template: template,
pool_size: length(connections),
health: :ok
})

:error ->
if Map.has_key?(state.paths, key) do
put_in(state.paths, [key, :health], :ok)
else
state.paths
end
end

{:reply, :ok, %{state | paths: paths}}
end

@impl true
def handle_cast({:report_send_error, path}, state) do
case Map.fetch(state.paths, path) do
{:ok, %{health: %{} = health}} ->
paths = put_in(state.paths, [path, :health, :lost_count], health.lost_count + 1)
{:noreply, %{state | paths: paths}}

{:ok, %{health: :ok} = _path_entry} ->
close_and_remove(state.table, path)

delay = backoff_ms(0)
timer_ref = Process.send_after(self(), {:health_check, path}, delay)

Logger.warning(
"Statix: UDS path #{path} marked unhealthy, " <>
"scheduling reconnect in #{delay}ms"
)

health = %{backoff_index: 0, timer_ref: timer_ref, lost_count: 1}
paths = put_in(state.paths, [path, :health], health)
{:noreply, %{state | paths: paths}}

:error ->
Logger.error("Statix: UDS path #{path} has no conn_template, cannot reconnect")
{:noreply, state}
end
end

@impl true
def terminate(_reason, table) do
# Close all sockets before table is destroyed
def handle_info({:health_check, path}, state) do
case Map.fetch(state.paths, path) do
{:ok, %{health: %{} = _health} = path_info} ->
attempt_reconnect(path, path_info, state)

_ ->
# Not unhealthy (race with successful set, or path removed)
{:noreply, state}
end
end

# All-or-nothing reconnection strategy.
#
# Unlike UDP over a network, a UDS socket is a local-host resource: the server
# socket file either exists on the filesystem or it doesn't. There is no partial
# reachability. If we can open one DGRAM connection to it, we can open all of
# them; if we can't open one, we can't open any. So partial success doesn't need
# handling — any failure means total failure, and we retry the full pool later.
defp attempt_reconnect(path, %{health: health} = path_info, state) do
results =
Enum.map(1..path_info.pool_size, fn _ ->
Conn.safe_open(path_info.conn_template)
end)

{successes, failures} = Enum.split_with(results, &match?({:ok, _}, &1))
opened = Enum.map(successes, fn {:ok, conn} -> conn end)

if failures == [] do
:ets.insert(state.table, {path, opened})

Logger.info(
"Statix: reconnected UDS path #{path} " <>
"after losing #{health.lost_count} metric(s)"
)

paths = put_in(state.paths, [path, :health], :ok)
{:noreply, %{state | paths: paths}}
else
close_connections(opened)

next_index = min(health.backoff_index + 1, @max_backoff_index)
delay = backoff_ms(next_index)
timer_ref = Process.send_after(self(), {:health_check, path}, delay)

Logger.warning(
"Statix: reconnect failed for UDS path #{path}, " <>
"#{health.lost_count} metric(s) lost so far, retrying in #{delay}ms"
)

new_health = %{health | backoff_index: next_index, timer_ref: timer_ref}
paths = put_in(state.paths, [path, :health], new_health)
{:noreply, %{state | paths: paths}}
end
end

@impl true
def terminate(_reason, %{table: table}) do
:ets.foldl(
fn {_path, connections}, acc ->
close_connections(connections)
Expand All @@ -76,11 +171,26 @@ defmodule Statix.ConnTracker do
:ok
end

defp close_and_remove(table, path) do
case :ets.take(table, path) do
[{^path, connections}] -> close_connections(connections)
[] -> :ok
end
end

defp close_connections(connections) do
Enum.each(connections, fn conn ->
if conn.transport == :uds and is_reference(conn.sock) do
try do
:socket.close(conn.sock)
catch
_, _ -> :ok
end
end)
end

defp backoff_ms(index) do
base = Enum.at(@backoff_steps, index)
jitter = trunc(base * 0.1)
base - jitter + :rand.uniform(jitter * 2 + 1) - 1
end
end
Loading
Loading