Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 134 additions & 3 deletions lib/stargate/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@ defmodule Stargate.Connection do
topic: String.t()
}

@typedoc """
Calculates custom connection backoff when unable to connect to Pulsar.
Can be used to implement exponential backoff.
Note that for an MFA tuple, the arity of the function should be `length(args) + 1` since the first
arg will be the attempt.
"""
@type backoff_calculator ::
(attempt :: non_neg_integer() -> timeout()) | {module(), fun :: atom(), args :: list()}

@callback handle_send_error(reason :: term, ref :: term, state :: term) :: :ok
@callback handle_connected(state :: term) :: :ok

@doc """
The Connection using macro provides the common websocket connection
and keepalive functionality into a single line for replicating connection
Expand All @@ -24,15 +36,22 @@ defmodule Stargate.Connection do
quote do
use WebSockex

@behaviour Stargate.Connection
@ping_interval 30_000

require Logger

@impl WebSockex
def handle_connect(_conn, state) do
:timer.send_interval(30_000, :send_ping)
{:ok, state}
tref = Process.send_after(self(), :send_ping, @ping_interval)
:ok = apply(__MODULE__, :handle_connected, [state])
{:ok, Map.put(state, :ping_timer_ref, tref)}
end

@impl WebSockex
def handle_info(:send_ping, state) do
{:reply, :ping, state}
tref = Process.send_after(self(), :send_ping, @ping_interval)
{:reply, :ping, Map.put(state, :ping_timer_ref, tref)}
end

@impl WebSockex
Expand All @@ -44,6 +63,91 @@ defmodule Stargate.Connection do
def handle_pong(_pong_frame, state) do
{:ok, state}
end

@impl WebSockex
def handle_disconnect(
%{reason: reason, attempt_number: attempt},
%{backoff_calculator: backoff_calculator} = state
) do
case Map.get(state, :ping_timer_ref) do
nil ->
:ok

ref ->
Process.cancel_timer(ref)
:ok
end

backoff = Stargate.Connection.calculate_backoff(backoff_calculator, attempt)

Logger.warning(
"[Stargate] disconnected",
reason: inspect(reason),
url: state.url,
attempt: attempt,
backoff_ms: backoff
)

{:backoff, backoff, state}
end

@impl WebSockex
def handle_send_result(:ok, _frame, _ref, state) do
{:ok, state}
end

@impl WebSockex
def handle_send_result({:error, reason}, :ping, _ref, state) do
Logger.warning("[Stargate] error sending ping", reason: inspect(reason))
{:close, state}
end

@impl WebSockex
def handle_send_result({:error, %WebSockex.ConnError{} = reason}, frame, ref, state) do
Logger.warning("[Stargate] error sending message", reason: inspect(reason), url: state.url)

:ok = apply(__MODULE__, :handle_send_error, [:not_connected, ref, state])
{:close, state}
end

@impl WebSockex
def handle_send_result({:error, %WebSockex.NotConnectedError{} = reason}, frame, ref, state) do
Logger.warning("[Stargate] error sending message", reason: inspect(reason), url: state.url)

:ok = apply(__MODULE__, :handle_send_error, [:not_connected, ref, state])
{:ok, state}
end

@impl WebSockex
def handle_send_result({:error, reason}, frame, ref, state) do
Logger.warning("[Stargate] error sending message", reason: inspect(reason), url: state.url)

:ok = apply(__MODULE__, :handle_send_error, [:unknown, ref, state])
{:ok, state}
end

@impl WebSockex
def terminate({reason, stacktrace}, state) when is_exception(reason) do
Logger.error(Exception.format(:error, reason, stacktrace))
end

@impl WebSockex
def terminate(_reason, _state) do
:ok
end

@impl Stargate.Connection
def handle_send_error(reason, ref, state) do
:ok
end

@impl Stargate.Connection
def handle_connected(state) do
:ok
end

defoverridable handle_send_error: 3,
handle_connected: 1
end
end

Expand Down Expand Up @@ -102,6 +206,33 @@ defmodule Stargate.Connection do
|> Enum.map(&transform_auth/1)
end

@doc false
@spec calculate_backoff(attempt :: non_neg_integer()) :: timeout()
def calculate_backoff(_attempt) do
2_000
end

@doc false
@spec calculate_backoff(backoff_calculator(), attempt :: non_neg_integer()) :: timeout()
def calculate_backoff(calc, attempt) do
case calc do
{module, function, args} ->
apply(module, function, [attempt | args])

fun when is_function(fun, 1) ->
fun.(attempt)

_ ->
raise ArgumentError,
"Backoff calculator does not conform to spec of {module, function, args} or fun/1"
end
end

@doc false
def default_backoff_calculator() do
{Stargate.Connection, :calculate_backoff, []}
end

defp transform_auth({:ssl_options, _opts} = ssl_opts), do: ssl_opts

defp transform_auth({:auth_token, token}) do
Expand Down
36 changes: 28 additions & 8 deletions lib/stargate/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ defmodule Stargate.Producer do
use Puid
import Stargate.Supervisor, only: [via: 2]
alias Stargate.Producer.{Acknowledger, QueryParams}
alias Stargate.Connection

@typedoc """
A URL defining the host and topic to which a Stargate producer can
Expand Down Expand Up @@ -99,7 +100,8 @@ defmodule Stargate.Producer do
When calling `produce/3` the third argument must be an MFA tuple which is used by
the producer's acknowledger process to asynchronously perform acknowledgement that the
message was received by the cluster successfully. This is used to avoid blocking the
calling process for performance reasons.
calling process for performance reasons. The result of the produce is added as a first
argument when calling the MFA tuple which is either `:ok` or `{:error, reason}`.
"""
@spec produce(producer(), message() | [message()], {module(), atom(), [term()]}) ::
:ok | {:error, term()}
Expand Down Expand Up @@ -130,7 +132,8 @@ defmodule Stargate.Producer do
:tenant,
:namespace,
:topic,
:query_params
:query_params,
:backoff_calculator
]
end

Expand All @@ -153,6 +156,7 @@ defmodule Stargate.Producer do
* `persistence` can be one of "persistent" or "non-persistent" per the Pulsar
specification of topics as being in-memory only or persisted to the brokers' disks.
Defaults to "persistent".
* `backoff_calculator` See `Stargate.Connection.t:backoff_calculator/0`.
* `query_params` is a map containing any or all of the following:

* `send_timeout` the time at which a produce operation will time out; defaults to 30 seconds
Expand All @@ -178,11 +182,15 @@ defmodule Stargate.Producer do
query_params = QueryParams.build_params(query_params_config)
registry = Keyword.fetch!(args, :registry)

backoff_calculator =
Keyword.get(args, :backoff_calculator, Stargate.Connection.default_backoff_calculator())

state =
args
|> Stargate.Connection.connection_settings(:producer, query_params)
|> Map.put(:query_params, query_params_config)
|> Map.put(:registry, registry)
|> Map.put(:backoff_calculator, backoff_calculator)
|> (fn fields -> struct(State, fields) end).()

server_opts =
Expand All @@ -202,17 +210,16 @@ defmodule Stargate.Producer do

@impl WebSockex
def handle_cast({:send, payload, ctx, ack}, state) do
Acknowledger.produce(
ack_name =
via(
state.registry,
{:producer_ack, "#{state.persistence}", "#{state.tenant}", "#{state.namespace}",
"#{state.topic}"}
),
ctx,
ack
)
)

Acknowledger.produce(ack_name, ctx, ack)

{:reply, {:text, payload}, state}
{:reply, {:text, payload}, ctx, state}
end

@impl WebSockex
Expand All @@ -235,6 +242,19 @@ defmodule Stargate.Producer do
{:ok, state}
end

@impl Connection
def handle_send_error(reason, ctx, state) do
:ok =
state.registry
|> via(
{:producer_ack, "#{state.persistence}", "#{state.tenant}", "#{state.namespace}",
"#{state.topic}"}
)
|> Acknowledger.ack({:error, reason, ctx})

:ok
end

defp construct_payload(%{"payload" => _payload, "context" => context} = message) do
encoded_message =
message
Expand Down
6 changes: 3 additions & 3 deletions lib/stargate/producer/acknowledger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ defmodule Stargate.Producer.Acknowledger do
send(pid, {ref, :ack})

{module, function, args} ->
apply(module, function, args)
apply(module, function, [:ok | args])
end

{:noreply, new_state}
Expand All @@ -86,8 +86,8 @@ defmodule Stargate.Producer.Acknowledger do
{pid, ref} when is_pid(pid) ->
send(pid, {ref, :error, reason})

_mfa ->
Logger.error("Failed to execute produce for reason : #{inspect(reason)}")
{module, function, args} ->
apply(module, function, [{:error, reason} | args])
end

{:noreply, new_state}
Expand Down
24 changes: 22 additions & 2 deletions lib/stargate/receiver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule Stargate.Receiver do
import Stargate.Supervisor, only: [via: 2]
alias Stargate.{Consumer, Reader}
alias Stargate.Receiver.Dispatcher
alias Stargate.Connection

@typedoc "A string identifier assigned to each message by the cluster"
@type message_id :: String.t()
Expand Down Expand Up @@ -57,7 +58,8 @@ defmodule Stargate.Receiver do
:tenant,
:namespace,
:topic,
:query_params
:query_params,
:backoff_calculator
]
end

Expand Down Expand Up @@ -89,6 +91,7 @@ defmodule Stargate.Receiver do
on the received messages. Defaults to 1.
* `handler_init_args` is any term that will be passed to the message handler to initialize
its state when a stateful handler is desired. Defaults to an empty list.
* `backoff_calculator` See `Stargate.Connection.t:backoff_calculator/0`.
* `query_params` is a map containing any or all of the following:

# Consumer
Expand Down Expand Up @@ -122,6 +125,9 @@ defmodule Stargate.Receiver do
registry = Keyword.fetch!(args, :registry)
query_params_config = Keyword.get(args, :query_params)

backoff_calculator =
Keyword.get(args, :backoff_calculator, Stargate.Connection.default_backoff_calculator())

query_params =
case type do
:consumer -> Consumer.QueryParams.build_params(query_params_config)
Expand All @@ -130,7 +136,8 @@ defmodule Stargate.Receiver do

setup_state = %{
registry: registry,
query_params: query_params_config
query_params: query_params_config,
backoff_calculator: backoff_calculator
}

state =
Expand All @@ -154,6 +161,19 @@ defmodule Stargate.Receiver do
WebSockex.start_link(state.url, __MODULE__, state, server_opts)
end

@impl Connection
def handle_connected(state) do
:ok =
state.registry
|> via(
{:dispatcher, "#{state.persistence}", "#{state.tenant}", "#{state.namespace}",
"#{state.topic}"}
)
|> Dispatcher.connected()

:ok
end

@impl WebSockex
def handle_frame(
{:text, msg},
Expand Down
Loading