Skip to content

Commit 6d5dd31

Browse files
author
Brendan Ball
committed
Add backoff and reconnect support
Currently Stargate prevents the application from starting up or causes it to crash if pulsar is or becomes unavailable. This is a result of how WebSockex is implemented. This builds on a refactor of WebSockex at dominicletz/websockex#112 which implements connection management async to process startup. This makes Stargate more production ready since it will allow an application to degrade gracefully when pulsar isn't available temporarily. Main changes as a result of this: 1. Reconnect backoff feature is added to be customized by the user 2. Since Stargate will continue running even when not connected to Pulsar, the user needs to know that messages aren't being produced successfully. A breaking change was made to the async ACK MFA where the first argument is now the result of the produce, which may either be `:ok` or `{:error, reason}`.
1 parent 3c33e0a commit 6d5dd31

14 files changed

Lines changed: 537 additions & 65 deletions

File tree

lib/stargate/connection.ex

Lines changed: 134 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,18 @@ defmodule Stargate.Connection do
1515
topic: String.t()
1616
}
1717

18+
@typedoc """
19+
Calculates custom connection backoff when unable to connect to Pulsar.
20+
Can be used to implement exponential backoff.
21+
Note that for an MFA tuple, the arity of the function should be `length(args) + 1` since the first
22+
arg will be the attempt.
23+
"""
24+
@type backoff_calculator ::
25+
(attempt :: non_neg_integer() -> timeout()) | {module(), fun :: atom(), args :: list()}
26+
27+
@callback handle_send_error(reason :: term, ref :: term, state :: term) :: :ok
28+
@callback handle_connected(state :: term) :: :ok
29+
1830
@doc """
1931
The Connection using macro provides the common websocket connection
2032
and keepalive functionality into a single line for replicating connection
@@ -24,15 +36,22 @@ defmodule Stargate.Connection do
2436
quote do
2537
use WebSockex
2638

39+
@behaviour Stargate.Connection
40+
@ping_interval 30_000
41+
42+
require Logger
43+
2744
@impl WebSockex
2845
def handle_connect(_conn, state) do
29-
:timer.send_interval(30_000, :send_ping)
30-
{:ok, state}
46+
tref = Process.send_after(self(), :send_ping, @ping_interval)
47+
:ok = apply(__MODULE__, :handle_connected, [state])
48+
{:ok, Map.put(state, :ping_timer_ref, tref)}
3149
end
3250

3351
@impl WebSockex
3452
def handle_info(:send_ping, state) do
35-
{:reply, :ping, state}
53+
tref = Process.send_after(self(), :send_ping, @ping_interval)
54+
{:reply, :ping, Map.put(state, :ping_timer_ref, tref)}
3655
end
3756

3857
@impl WebSockex
@@ -44,6 +63,91 @@ defmodule Stargate.Connection do
4463
def handle_pong(_pong_frame, state) do
4564
{:ok, state}
4665
end
66+
67+
@impl WebSockex
68+
def handle_disconnect(
69+
%{reason: reason, attempt_number: attempt},
70+
%{backoff_calculator: backoff_calculator} = state
71+
) do
72+
case Map.get(state, :ping_timer_ref) do
73+
nil ->
74+
:ok
75+
76+
ref ->
77+
Process.cancel_timer(ref)
78+
:ok
79+
end
80+
81+
backoff = Stargate.Connection.calculate_backoff(backoff_calculator, attempt)
82+
83+
Logger.warning(
84+
"[Stargate] disconnected",
85+
reason: inspect(reason),
86+
url: state.url,
87+
attempt: attempt,
88+
backoff_ms: backoff
89+
)
90+
91+
{:backoff, backoff, state}
92+
end
93+
94+
@impl WebSockex
95+
def handle_send_result(:ok, _frame, _ref, state) do
96+
{:ok, state}
97+
end
98+
99+
@impl WebSockex
100+
def handle_send_result({:error, reason}, :ping, _ref, state) do
101+
Logger.warning("[Stargate] error sending ping", reason: inspect(reason))
102+
{:close, state}
103+
end
104+
105+
@impl WebSockex
106+
def handle_send_result({:error, %WebSockex.ConnError{} = reason}, frame, ref, state) do
107+
Logger.warning("[Stargate] error sending message", reason: inspect(reason), url: state.url)
108+
109+
:ok = apply(__MODULE__, :handle_send_error, [:not_connected, ref, state])
110+
{:close, state}
111+
end
112+
113+
@impl WebSockex
114+
def handle_send_result({:error, %WebSockex.NotConnectedError{} = reason}, frame, ref, state) do
115+
Logger.warning("[Stargate] error sending message", reason: inspect(reason), url: state.url)
116+
117+
:ok = apply(__MODULE__, :handle_send_error, [:not_connected, ref, state])
118+
{:ok, state}
119+
end
120+
121+
@impl WebSockex
122+
def handle_send_result({:error, reason}, frame, ref, state) do
123+
Logger.warning("[Stargate] error sending message", reason: inspect(reason), url: state.url)
124+
125+
:ok = apply(__MODULE__, :handle_send_error, [:unknown, ref, state])
126+
{:ok, state}
127+
end
128+
129+
@impl WebSockex
130+
def terminate({reason, stacktrace}, state) when is_exception(reason) do
131+
Logger.error(Exception.format(:error, reason, stacktrace))
132+
end
133+
134+
@impl WebSockex
135+
def terminate(_reason, _state) do
136+
:ok
137+
end
138+
139+
@impl Stargate.Connection
140+
def handle_send_error(reason, ref, state) do
141+
:ok
142+
end
143+
144+
@impl Stargate.Connection
145+
def handle_connected(state) do
146+
:ok
147+
end
148+
149+
defoverridable handle_send_error: 3,
150+
handle_connected: 1
47151
end
48152
end
49153

@@ -102,6 +206,33 @@ defmodule Stargate.Connection do
102206
|> Enum.map(&transform_auth/1)
103207
end
104208

209+
@doc false
210+
@spec calculate_backoff(attempt :: non_neg_integer()) :: timeout()
211+
def calculate_backoff(_attempt) do
212+
2_000
213+
end
214+
215+
@doc false
216+
@spec calculate_backoff(backoff_calculator(), attempt :: non_neg_integer()) :: timeout()
217+
def calculate_backoff(calc, attempt) do
218+
case calc do
219+
{module, function, args} ->
220+
apply(module, function, [attempt | args])
221+
222+
fun when is_function(fun, 1) ->
223+
fun.(attempt)
224+
225+
_ ->
226+
raise ArgumentError,
227+
"Backoff calculator does not conform to spec of {module, function, args} or fun/1"
228+
end
229+
end
230+
231+
@doc false
232+
def default_backoff_calculator() do
233+
{Stargate.Connection, :calculate_backoff, []}
234+
end
235+
105236
defp transform_auth({:ssl_options, _opts} = ssl_opts), do: ssl_opts
106237

107238
defp transform_auth({:auth_token, token}) do

lib/stargate/producer.ex

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ defmodule Stargate.Producer do
1212
use Puid
1313
import Stargate.Supervisor, only: [via: 2]
1414
alias Stargate.Producer.{Acknowledger, QueryParams}
15+
alias Stargate.Connection
1516

1617
@typedoc """
1718
A URL defining the host and topic to which a Stargate producer can
@@ -99,7 +100,8 @@ defmodule Stargate.Producer do
99100
When calling `produce/3` the third argument must be an MFA tuple which is used by
100101
the producer's acknowledger process to asynchronously perform acknowledgement that the
101102
message was received by the cluster successfully. This is used to avoid blocking the
102-
calling process for performance reasons.
103+
calling process for performance reasons. The result of the produce is added as a first
104+
argument when calling the MFA tuple which is either `:ok` or `{:error, reason}`.
103105
"""
104106
@spec produce(producer(), message() | [message()], {module(), atom(), [term()]}) ::
105107
:ok | {:error, term()}
@@ -130,7 +132,8 @@ defmodule Stargate.Producer do
130132
:tenant,
131133
:namespace,
132134
:topic,
133-
:query_params
135+
:query_params,
136+
:backoff_calculator
134137
]
135138
end
136139

@@ -153,6 +156,7 @@ defmodule Stargate.Producer do
153156
* `persistence` can be one of "persistent" or "non-persistent" per the Pulsar
154157
specification of topics as being in-memory only or persisted to the brokers' disks.
155158
Defaults to "persistent".
159+
* `backoff_calculator` See `Stargate.Connection.t:backoff_calculator/0`.
156160
* `query_params` is a map containing any or all of the following:
157161
158162
* `send_timeout` the time at which a produce operation will time out; defaults to 30 seconds
@@ -178,11 +182,15 @@ defmodule Stargate.Producer do
178182
query_params = QueryParams.build_params(query_params_config)
179183
registry = Keyword.fetch!(args, :registry)
180184

185+
backoff_calculator =
186+
Keyword.get(args, :backoff_calculator, Stargate.Connection.default_backoff_calculator())
187+
181188
state =
182189
args
183190
|> Stargate.Connection.connection_settings(:producer, query_params)
184191
|> Map.put(:query_params, query_params_config)
185192
|> Map.put(:registry, registry)
193+
|> Map.put(:backoff_calculator, backoff_calculator)
186194
|> (fn fields -> struct(State, fields) end).()
187195

188196
server_opts =
@@ -202,17 +210,16 @@ defmodule Stargate.Producer do
202210

203211
@impl WebSockex
204212
def handle_cast({:send, payload, ctx, ack}, state) do
205-
Acknowledger.produce(
213+
ack_name =
206214
via(
207215
state.registry,
208216
{:producer_ack, "#{state.persistence}", "#{state.tenant}", "#{state.namespace}",
209217
"#{state.topic}"}
210-
),
211-
ctx,
212-
ack
213-
)
218+
)
219+
220+
Acknowledger.produce(ack_name, ctx, ack)
214221

215-
{:reply, {:text, payload}, state}
222+
{:reply, {:text, payload}, ctx, state}
216223
end
217224

218225
@impl WebSockex
@@ -235,6 +242,19 @@ defmodule Stargate.Producer do
235242
{:ok, state}
236243
end
237244

245+
@impl Connection
246+
def handle_send_error(reason, ctx, state) do
247+
:ok =
248+
state.registry
249+
|> via(
250+
{:producer_ack, "#{state.persistence}", "#{state.tenant}", "#{state.namespace}",
251+
"#{state.topic}"}
252+
)
253+
|> Acknowledger.ack({:error, reason, ctx})
254+
255+
:ok
256+
end
257+
238258
defp construct_payload(%{"payload" => _payload, "context" => context} = message) do
239259
encoded_message =
240260
message

lib/stargate/producer/acknowledger.ex

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ defmodule Stargate.Producer.Acknowledger do
7272
send(pid, {ref, :ack})
7373

7474
{module, function, args} ->
75-
apply(module, function, args)
75+
apply(module, function, [:ok | args])
7676
end
7777

7878
{:noreply, new_state}
@@ -86,8 +86,8 @@ defmodule Stargate.Producer.Acknowledger do
8686
{pid, ref} when is_pid(pid) ->
8787
send(pid, {ref, :error, reason})
8888

89-
_mfa ->
90-
Logger.error("Failed to execute produce for reason : #{inspect(reason)}")
89+
{module, function, args} ->
90+
apply(module, function, [{:error, reason} | args])
9191
end
9292

9393
{:noreply, new_state}

lib/stargate/receiver.ex

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ defmodule Stargate.Receiver do
99
import Stargate.Supervisor, only: [via: 2]
1010
alias Stargate.{Consumer, Reader}
1111
alias Stargate.Receiver.Dispatcher
12+
alias Stargate.Connection
1213

1314
@typedoc "A string identifier assigned to each message by the cluster"
1415
@type message_id :: String.t()
@@ -57,7 +58,8 @@ defmodule Stargate.Receiver do
5758
:tenant,
5859
:namespace,
5960
:topic,
60-
:query_params
61+
:query_params,
62+
:backoff_calculator
6163
]
6264
end
6365

@@ -89,6 +91,7 @@ defmodule Stargate.Receiver do
8991
on the received messages. Defaults to 1.
9092
* `handler_init_args` is any term that will be passed to the message handler to initialize
9193
its state when a stateful handler is desired. Defaults to an empty list.
94+
* `backoff_calculator` See `Stargate.Connection.t:backoff_calculator/0`.
9295
* `query_params` is a map containing any or all of the following:
9396
9497
# Consumer
@@ -122,6 +125,9 @@ defmodule Stargate.Receiver do
122125
registry = Keyword.fetch!(args, :registry)
123126
query_params_config = Keyword.get(args, :query_params)
124127

128+
backoff_calculator =
129+
Keyword.get(args, :backoff_calculator, Stargate.Connection.default_backoff_calculator())
130+
125131
query_params =
126132
case type do
127133
:consumer -> Consumer.QueryParams.build_params(query_params_config)
@@ -130,7 +136,8 @@ defmodule Stargate.Receiver do
130136

131137
setup_state = %{
132138
registry: registry,
133-
query_params: query_params_config
139+
query_params: query_params_config,
140+
backoff_calculator: backoff_calculator
134141
}
135142

136143
state =
@@ -154,6 +161,19 @@ defmodule Stargate.Receiver do
154161
WebSockex.start_link(state.url, __MODULE__, state, server_opts)
155162
end
156163

164+
@impl Connection
165+
def handle_connected(state) do
166+
:ok =
167+
state.registry
168+
|> via(
169+
{:dispatcher, "#{state.persistence}", "#{state.tenant}", "#{state.namespace}",
170+
"#{state.topic}"}
171+
)
172+
|> Dispatcher.connected()
173+
174+
:ok
175+
end
176+
157177
@impl WebSockex
158178
def handle_frame(
159179
{:text, msg},

0 commit comments

Comments
 (0)