Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions lib/statix.ex
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,6 @@ defmodule Statix do
def new(module, options) do
config = get_config(module, options)

# Determine transport based on socket_path presence
conn =
if config.socket_path do
Conn.new(config.socket_path, config.prefix)
Expand All @@ -396,12 +395,14 @@ defmodule Statix do
def open(%__MODULE__{conn: %{transport: :uds, sock: {:socket_path, path}} = conn, pool: pool}) do
# UDS sockets are socket references (not ports), so they cannot be registered as process names.
# Instead, store them in ConnTracker's ETS table.
Statix.ConnTracker.ensure_started()

connections =
Enum.map(pool, fn _name ->
Conn.open(conn)
end)

Statix.ConnTracker.set(path, connections)
Statix.ConnTracker.set(path, connections, conn_template: conn)
end

def open(%__MODULE__{conn: conn, pool: pool}) do
Expand All @@ -425,7 +426,14 @@ defmodule Statix do

case Statix.ConnTracker.get(path) do
{:ok, conn} ->
Conn.transmit(conn, type, key, to_string(value), options)
case Conn.transmit(conn, type, key, to_string(value), options) do
:ok ->
:ok

{:error, _reason} = error ->
Statix.ConnTracker.report_send_error(path)
error
end

{:error, :not_found} ->
{:error, :socket_not_found}
Expand Down Expand Up @@ -453,7 +461,6 @@ defmodule Statix do
end
end

# Takes first :sample_rate occurrence (standard keyword list behavior)
defp should_send?([]), do: true
defp should_send?([{:sample_rate, rate} | _]), do: rate >= :rand.uniform()
defp should_send?([_ | rest]), do: should_send?(rest)
Expand Down Expand Up @@ -484,7 +491,6 @@ defmodule Statix do
tags: tags
}

# Warn if both socket_path and host/port are specified
if config.socket_path do
has_custom_host = Keyword.has_key?(options, :host)
has_custom_port = Keyword.has_key?(options, :port)
Expand Down
17 changes: 9 additions & 8 deletions lib/statix/application.ex
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
defmodule Statix.Application do
@moduledoc false

use Application

def start(_type, _args) do
children = [
Statix.ConnTracker
]

Supervisor.start_link(children, strategy: :one_for_one, name: Statix.Supervisor)
def ensure_started do
case Supervisor.start_link([Statix.ConnTracker],
strategy: :one_for_one,
name: Statix.Supervisor
) do
{:ok, _pid} -> :ok
{:error, {:already_started, _pid}} -> :ok
{:error, reason} -> {:error, reason}
end
end
end
28 changes: 19 additions & 9 deletions lib/statix/conn.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ defmodule Statix.Conn do
# sock field holds different types depending on state and transport:
# - UDP: port (after open/1) or atom for process name
# - UDS: {:socket_path, path} before open/1, socket reference after
# socket_path field preserves the UDS path even after opening
defstruct [:sock, :address, :port, :prefix, :transport, :socket_path]

alias Statix.Packet
Expand Down Expand Up @@ -44,22 +43,33 @@ defmodule Statix.Conn do

def open(%__MODULE__{transport: :uds, sock: {:socket_path, path}} = conn) do
unless Code.ensure_loaded?(:socket) do
raise "Unix domain socket support requires OTP 22+. Current OTP version does not support :socket module."
raise "Unix domain socket support requires OTP 22+"
end

{:ok, sock} = :socket.open(:local, :dgram, :default)
path_addr = %{family: :local, path: String.to_charlist(path)}

case :socket.connect(sock, path_addr) do
:ok ->
%__MODULE__{conn | sock: sock}
case safe_open(conn) do
{:ok, opened} ->
opened

{:error, reason} ->
:socket.close(sock)
raise "Failed to connect to Unix domain socket at #{path}: #{inspect(reason)}"
end
end

def safe_open(%__MODULE__{transport: :uds, sock: {:socket_path, path}} = conn) do
with {:ok, sock} <- :socket.open(:local, :dgram, :default) do
path_addr = %{family: :local, path: String.to_charlist(path)}

case :socket.connect(sock, path_addr) do
:ok ->
{:ok, %__MODULE__{conn | sock: sock}}

{:error, reason} ->
:socket.close(sock)
{:error, reason}
end
end
end

def transmit(%__MODULE__{sock: sock, prefix: prefix} = conn, type, key, val, options)
when is_binary(val) and is_list(options) do
result =
Expand Down
188 changes: 155 additions & 33 deletions lib/statix/conn_tracker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,65 +5,172 @@ defmodule Statix.ConnTracker do

alias Statix.Conn

require Logger

@backoff_steps [1_000, 5_000, 30_000, 60_000, 120_000, 300_000]

def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

defdelegate ensure_started, to: Statix.Application

@impl true
def init(_opts) do
table =
:ets.new(:statix_conn_tracker, [:set, :protected, :named_table, read_concurrency: true])

{:ok, table}
{:ok, %{table: table, unhealthy: %{}, path_meta: %{}}}
end

@doc """
Stores a list of connections for the given key, replacing any existing connections.

Closes old connections before storing new ones to prevent resource leaks.
Typically called when establishing a UDS connection pool with pool_size > 1.
"""
@spec set(key :: term(), connections :: [Conn.t()]) :: :ok
def set(key, connections) do
GenServer.call(__MODULE__, {:set, key, connections})
@spec set(key :: term(), connections :: [Conn.t()], opts :: keyword()) :: :ok
def set(key, connections, opts \\ []) do
GenServer.call(__MODULE__, {:set, key, connections, opts})
end

@doc """
Retrieves a random connection for the given key.

Returns `{:ok, conn}` with a randomly selected connection from the pool,
or `{:error, :not_found}` if no connections exist for the key.
"""
@spec get(key :: term()) :: {:ok, Conn.t()} | {:error, :not_found}
def get(key) do
case :ets.lookup(:statix_conn_tracker, key) do
[{^key, connections}] ->
random_connection = Enum.random(connections)
{:ok, random_connection}
[{^key, [_ | _] = connections}] ->
{:ok, Enum.random(connections)}

[] ->
_ ->
{:error, :not_found}
end
end

@spec report_send_error(key :: term()) :: :ok
def report_send_error(key) do
GenServer.cast(__MODULE__, {:report_send_error, key})
end

@impl true
def handle_call({:set, key, connections}, _from, state) do
# Close old connections before replacing them
case :ets.lookup(state, key) do
[{^key, old_connections}] ->
close_connections(old_connections)

[] ->
:ok
def handle_call({:set, key, connections, opts}, _from, state) do
close_and_remove(state.table, key)
:ets.insert(state.table, {key, connections})

# Clear unhealthy state if present — a manual connect() supersedes the health-check loop.
# Cancel the pending timer so it doesn't close these fresh connections.
unhealthy =
case Map.pop(state.unhealthy, key) do
{nil, map} ->
map

{entry, map} ->
Process.cancel_timer(entry.timer_ref)
map
end

path_meta =
case Keyword.fetch(opts, :conn_template) do
{:ok, template} ->
Map.put(state.path_meta, key, %{
conn_template: template,
pool_size: length(connections)
})

:error ->
state.path_meta
end

{:reply, :ok, %{state | path_meta: path_meta, unhealthy: unhealthy}}
end

@impl true
def handle_cast({:report_send_error, path}, state) do
if Map.has_key?(state.unhealthy, path) do
state = update_in(state.unhealthy[path].lost_count, &(&1 + 1))
{:noreply, state}
else
case Map.fetch(state.path_meta, path) do
{:ok, %{conn_template: conn_template, pool_size: pool_size}} ->
# UDS DGRAM sockets to a dead server will never recover on their own;
# only opening new sockets after the server restarts can restore service.
close_and_remove(state.table, path)

delay = backoff_ms(0)
timer_ref = Process.send_after(self(), {:health_check, path}, delay)

unhealthy_entry = %{
backoff_index: 0,
timer_ref: timer_ref,
conn_template: conn_template,
pool_size: pool_size,
lost_count: 1
}

Logger.warning(
"Statix: UDS path #{path} marked unhealthy, " <>
"scheduling reconnect in #{delay}ms"
)

{:noreply, put_in(state.unhealthy[path], unhealthy_entry)}

:error ->
# No template stored — can't reconnect. This shouldn't happen.
Logger.error("Statix: UDS path #{path} has no conn_template, cannot reconnect")
{:noreply, state}
end
end
end

:ets.insert(state, {key, connections})
{:reply, :ok, state}
@impl true
def handle_info({:health_check, path}, state) do
case Map.fetch(state.unhealthy, path) do
{:ok, entry} ->
attempt_reconnect(path, entry, state)

:error ->
# No longer unhealthy (race with successful set?)
{:noreply, state}
end
end

# All-or-nothing reconnection strategy.
#
# Unlike UDP over a network, a UDS socket is a local-host resource: the server
# socket file either exists on the filesystem or it doesn't. There is no partial
# reachability. If we can open one DGRAM connection to it, we can open all of
# them; if we can't open one, we can't open any. So partial success doesn't need
# handling — any failure means total failure, and we retry the full pool later.
defp attempt_reconnect(path, entry, state) do
results =
Enum.map(1..entry.pool_size, fn _ ->
Conn.safe_open(entry.conn_template)
end)

{successes, failures} = Enum.split_with(results, &match?({:ok, _}, &1))
opened = Enum.map(successes, fn {:ok, conn} -> conn end)

if failures == [] do
:ets.insert(state.table, {path, opened})

Logger.info(
"Statix: reconnected UDS path #{path} " <>
"after losing #{entry.lost_count} metric(s)"
)

{:noreply, %{state | unhealthy: Map.delete(state.unhealthy, path)}}
else
# All or nothing
close_connections(opened)

next_index = min(entry.backoff_index + 1, length(@backoff_steps) - 1)
delay = backoff_ms(next_index)
timer_ref = Process.send_after(self(), {:health_check, path}, delay)

Logger.warning(
"Statix: reconnect failed for UDS path #{path}, " <>
"#{entry.lost_count} metric(s) lost so far, retrying in #{delay}ms"
)

updated_entry = %{entry | backoff_index: next_index, timer_ref: timer_ref}
{:noreply, put_in(state.unhealthy[path], updated_entry)}
end
end

@impl true
def terminate(_reason, table) do
# Close all sockets before table is destroyed
def terminate(_reason, %{table: table}) do
:ets.foldl(
fn {_path, connections}, acc ->
close_connections(connections)
Expand All @@ -76,11 +183,26 @@ defmodule Statix.ConnTracker do
:ok
end

defp close_and_remove(table, path) do
case :ets.take(table, path) do
[{^path, connections}] -> close_connections(connections)
[] -> :ok
end
end

defp close_connections(connections) do
Enum.each(connections, fn conn ->
if conn.transport == :uds and is_reference(conn.sock) do
try do
:socket.close(conn.sock)
catch
_, _ -> :ok
end
end)
end

defp backoff_ms(index) do
base = Enum.at(@backoff_steps, index, List.last(@backoff_steps))
jitter = trunc(base * 0.1)
base - jitter + :rand.uniform(jitter * 2 + 1) - 1
end
end
1 change: 0 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ defmodule Statix.Mixfile do

def application() do
[
mod: {Statix.Application, []},
extra_applications: [:logger]
]
end
Expand Down
Loading
Loading