Skip to content

Commit 09f1e67

Browse files
committed
Implement reconnection [WIP]
1 parent 9e33b57 commit 09f1e67

File tree

5 files changed

+58
-7
lines changed

5 files changed

+58
-7
lines changed

src/config/mongoose_config_spec.erl

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -546,7 +546,8 @@ outgoing_pool_connection(<<"rabbit">>) ->
546546
<<"confirms_enabled">> => #option{type = boolean},
547547
<<"max_worker_queue_len">> => #option{type = int_or_infinity,
548548
validate = non_negative},
549-
<<"tls">> => tls([client])
549+
<<"tls">> => tls([client]),
550+
<<"reconnect">> => rabbit_reconnect()
550551
},
551552
include = always,
552553
defaults = #{<<"host">> => "localhost",
@@ -632,6 +633,15 @@ sql_tls() ->
632633
sql_tls_extra() ->
633634
#section{items = #{<<"required">> => #option{type = boolean}}}.
634635

636+
%% path: outgoing_pools.rabbit.*.connection.reconnect
637+
rabbit_reconnect() ->
638+
#section{items = #{<<"attempts">> => #option{type = integer, validate = positive},
639+
<<"delay">> => #option{type = integer, validate = non_negative}},
640+
defaults = #{<<"attempts">> => 10,
641+
<<"delay">> => 5000 % milliseconds
642+
}
643+
}.
644+
635645
%% TLS options
636646

637647
tls(Entities) when is_list(Entities) ->

src/mongoose_rabbit_worker.erl

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,12 @@
4646
password := binary(),
4747
virtual_host := binary(),
4848
confirms_enabled := boolean(),
49-
max_worker_queue_len := non_neg_integer() | infinity}.
49+
max_worker_queue_len := non_neg_integer() | infinity,
50+
reconnect => reconnect()}.
51+
52+
-type reconnect() :: #{attempts := pos_integer(),
53+
delay := non_neg_integer() % milliseconds
54+
}.
5055

5156
-type publish_result() :: boolean() | timeout | {channel_exception, any(), any()}.
5257

@@ -197,7 +202,30 @@ maybe_restart_rabbit_connection(#{connection := Conn} = State) ->
197202
end.
198203

199204
-spec establish_rabbit_connection(state()) -> state().
205+
establish_rabbit_connection(State = #{opts := #{reconnect := #{attempts := Attempts}}}) ->
206+
establish_rabbit_connection(State, Attempts);
200207
establish_rabbit_connection(State) ->
208+
establish_rabbit_connection(State, 0).
209+
210+
-spec establish_rabbit_connection(state(), non_neg_integer()) -> state().
211+
establish_rabbit_connection(State, RemainingAttempts) ->
212+
case start_amqp_connection(State) of
213+
{ok, NewState} ->
214+
NewState;
215+
{error, Error} when RemainingAttempts > 0 ->
216+
?LOG_WARNING(#{what => rabbit_connection_failed, reason => Error, worker_state => State,
217+
remaining_attempts => RemainingAttempts}),
218+
#{opts := #{reconnect := #{delay := Delay}}} = State,
219+
timer:sleep(Delay),
220+
establish_rabbit_connection(State, RemainingAttempts - 1);
221+
{error, Error} when RemainingAttempts =:= 0 ->
222+
ErrorInfo = #{what => rabbit_connection_failed, reason => Error, worker_state => State},
223+
?LOG_ERROR(ErrorInfo),
224+
exit(ErrorInfo)
225+
end.
226+
227+
-spec start_amqp_connection(state()) -> {ok, state()} | {error, term()}.
228+
start_amqp_connection(State) ->
201229
#{opts := Opts, host_type := HostType, pool_tag := PoolTag} = State,
202230
case amqp_connection:start(mongoose_amqp:network_params(Opts)) of
203231
{ok, Connection} ->
@@ -208,14 +236,12 @@ establish_rabbit_connection(State) ->
208236
maybe_enable_confirms(Channel, Opts),
209237
?LOG_DEBUG(#{what => rabbit_connection_established,
210238
host_type => HostType, pool_tag => PoolTag, opts => Opts}),
211-
State#{connection => Connection, channel => Channel};
239+
{ok, State#{connection => Connection, channel => Channel}};
212240
{error, Error} ->
213241
mongoose_instrument:execute(wpool_rabbit_connections,
214242
#{host_type => HostType, pool_tag => PoolTag},
215243
#{failed => 1}),
216-
?LOG_ERROR(#{what => rabbit_connection_failed, reason => Error,
217-
host_type => HostType, pool_tag => PoolTag, opts => Opts}),
218-
exit("connection to a Rabbit server failed")
244+
{error, Error}
219245
end.
220246

221247
-spec close_rabbit_connection(Connection :: pid(), Channel :: pid(),

test/common/config_parser_helper.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,8 @@ options("outgoing_pools") ->
363363
#{type => rabbit, scope => host_type, tag => event_pusher,
364364
opts => #{workers => 20},
365365
conn_opts => #{confirms_enabled => true,
366-
max_worker_queue_len => 100}},
366+
max_worker_queue_len => 100,
367+
reconnect => #{attempts => 5, delay => 2000}}},
367368
#{type => rdbms,
368369
opts => #{workers => 5},
369370
conn_opts => #{query_timeout => 5000, keepalive_interval => 30,
@@ -1302,6 +1303,8 @@ default_config([outgoing_pools, Type, _Tag, opts]) ->
13021303
default_pool_wpool_opts(Type);
13031304
default_config([outgoing_pools, Type, _Tag, conn_opts]) ->
13041305
default_pool_conn_opts(Type);
1306+
default_config([outgoing_pools, rabbit, _Tag, conn_opts, reconnect]) ->
1307+
#{attempts => 10, delay => 5000};
13051308
default_config([outgoing_pools, _Type, _Tag, conn_opts, tls]) ->
13061309
maps:merge(default_tls(), #{server_name_indication => default_sni()});
13071310
default_config([outgoing_pools, _Type, _Tag, conn_opts, tls, server_name_indication]) ->

test/config_parser_SUITE.erl

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ groups() ->
145145
pool_elastic_connection,
146146
pool_rabbit,
147147
pool_rabbit_connection,
148+
pool_rabbit_connection_reconnect,
148149
pool_rabbit_connection_tls,
149150
pool_ldap,
150151
pool_ldap_connection,
@@ -1114,6 +1115,15 @@ pool_rabbit_connection(_Config) ->
11141115
?err(T(#{<<"confirms_enabled">> => <<"yes">>})),
11151116
?err(T(#{<<"max_worker_queue_len">> => -1})).
11161117

1118+
pool_rabbit_connection_reconnect(_Config) ->
1119+
P = [outgoing_pools, 1, conn_opts, reconnect],
1120+
T = fun(Opts) -> pool_conn_raw(<<"rabbit">>, #{<<"reconnect">> => Opts}) end,
1121+
?cfg(P, default_config([outgoing_pools, rabbit, default, conn_opts, reconnect]), T(#{})),
1122+
?cfg(P ++ [attempts], 5, T(#{<<"attempts">> => 5})),
1123+
?cfg(P ++ [delay], 0, T(#{<<"delay">> => 0})), % not recommended but allowed
1124+
?err(T(#{<<"attempts">> => 0})), % disabling is achieved by omitting this subsection instead
1125+
?err(T(#{<<"delay">> => <<"infinity">>})).
1126+
11171127
pool_rabbit_connection_tls(_Config) ->
11181128
P = [outgoing_pools, 1, conn_opts, tls],
11191129
T = fun(Opts) -> pool_conn_raw(<<"rabbit">>, #{<<"tls">> => Opts}) end,

test/config_parser_SUITE_data/outgoing_pools.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@
5555
password = "guest"
5656
confirms_enabled = true
5757
max_worker_queue_len = 100
58+
reconnect.attempts = 5
59+
reconnect.delay = 2000
5860

5961
[outgoing_pools.ldap.default]
6062
scope = "host_type"

0 commit comments

Comments
 (0)