Skip to content

Commit 9bcf8e5

Browse files
committed
Refactor mongoose_rabbit_worker
- Transform options directly into state instead of converting it into a proplist, and then into another map with different keys. - Remove 'TODO Refactor' as we don't want to make the connection async. This could be an option added later if necessary, but currently it is consistent with other pools, e.g. rdbms. - Add type specs for gen_server callbacks. - Enable confirms after reconnection. So far, this would happen only on the first connection attempt. - Use state and opts in more functions instead of additional data types.
1 parent 8022a7d commit 9bcf8e5

File tree

3 files changed

+89
-99
lines changed

3 files changed

+89
-99
lines changed

src/mongoose_rabbit_worker.erl

Lines changed: 82 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,20 @@
3434

3535
-ignore_xref([start_link/0]).
3636

37-
-type state() :: #{amqp_client_opts := mongoose_amqp:network_params(),
38-
connection := pid(),
39-
channel := pid(),
40-
host_type := mongooseim:host_type_or_global(),
37+
-type state() :: #{host_type := mongooseim:host_type_or_global(),
4138
pool_tag := atom(),
42-
confirms := boolean(),
43-
max_queue_len := non_neg_integer() | infinity}.
39+
opts := opts(),
40+
connection => pid(),
41+
channel => pid()}.
42+
43+
-type opts() :: #{host := string(),
44+
port := inet:port_number(),
45+
username := binary(),
46+
password := binary(),
47+
virtual_host := binary(),
48+
confirms_enabled := boolean(),
49+
max_worker_queue_len := non_neg_integer() | infinity}.
4450

45-
-type worker_opts() :: state().
4651
-type publish_result() :: boolean() | timeout | {channel_exception, any(), any()}.
4752

4853
%%%===================================================================
@@ -69,21 +74,28 @@ instrumentation(HostType, Tag) ->
6974
%%% gen_server callbacks
7075
%%%===================================================================
7176

77+
-spec init(state()) -> {ok, state()}.
7278
init(Opts) ->
7379
process_flag(trap_exit, true),
74-
%TODO: Refactor with handle_continue when OTP 21 is minimal supported version
75-
do_init(Opts).
80+
{ok, establish_rabbit_connection(Opts)}.
7681

82+
-spec handle_call({amqp_call, mongoose_amqp:method()}, gen_server:from(), state()) ->
83+
{reply, request_dropped | {ok | error | exit | throw, mongoose_amqp:method() | atom()},
84+
state()}.
7785
handle_call(Req, From, State) ->
7886
maybe_handle_request(fun do_handle_call/3, [Req, From, State],
7987
{reply, request_dropped, State}).
8088

89+
-spec handle_cast({amqp_publish, mongoose_amqp:method(), mongoose_amqp:message()}, state()) ->
90+
{noreply, state()}.
8191
handle_cast(Req, State) ->
82-
maybe_handle_request(fun do_handle_cast/2, [Req, State], {noreply, State}).
92+
maybe_handle_request(fun do_handle_cast/2, [Req, State], {noreply, State}).
8393

94+
-spec handle_info(term(), state()) -> {noreply, state()}.
8495
handle_info(Req, State) ->
8596
maybe_handle_request(fun do_handle_info/2, [Req, State], {noreply, State}).
8697

98+
-spec terminate(term(), state()) -> ok.
8799
terminate(_Reason, #{connection := Connection, channel := Channel,
88100
host_type := HostType, pool_tag := PoolTag}) ->
89101
close_rabbit_connection(Connection, Channel, HostType, PoolTag),
@@ -93,55 +105,44 @@ terminate(_Reason, #{connection := Connection, channel := Channel,
93105
%%% Internal functions
94106
%%%===================================================================
95107

96-
do_init(Opts) ->
97-
HostType = proplists:get_value(host_type, Opts),
98-
PoolTag = proplists:get_value(pool_tag, Opts),
99-
AMQPClientOpts = proplists:get_value(amqp_client_opts, Opts),
100-
{Connection, Channel} =
101-
establish_rabbit_connection(AMQPClientOpts, HostType, PoolTag),
102-
IsConfirmEnabled = maybe_enable_confirms(Channel, Opts),
103-
MaxMsgQueueLen = proplists:get_value(max_queue_len, Opts),
104-
{ok, #{host_type => HostType, amqp_client_opts => AMQPClientOpts,
105-
connection => Connection, channel => Channel,
106-
confirms => IsConfirmEnabled, max_queue_len => MaxMsgQueueLen,
107-
pool_tag => PoolTag}}.
108-
108+
-spec do_handle_call({amqp_call, mongoose_amqp:method()}, gen_server:from(), state()) ->
109+
{reply, {ok | error | exit | throw, mongoose_amqp:method() | atom()}, state()}.
109110
do_handle_call({amqp_call, Method}, _From, State = #{channel := Channel}) ->
110111
try amqp_channel:call(Channel, Method) of
111112
Res ->
112113
{reply, {ok, Res}, State}
113114
catch
114115
Error:Reason ->
115-
{FreshConn, FreshChann} = maybe_restart_rabbit_connection(State),
116-
{reply, {Error, Reason}, State#{connection := FreshConn,
117-
channel := FreshChann}}
116+
{reply, {Error, Reason}, maybe_restart_rabbit_connection(State)}
118117
end.
119118

119+
-spec do_handle_cast({amqp_publish, mongoose_amqp:method(), mongoose_amqp:message()}, state()) ->
120+
{noreply, state()}.
120121
do_handle_cast({amqp_publish, Method, Payload}, State) ->
121122
handle_amqp_publish(Method, Payload, State).
122123

124+
-spec do_handle_info(term(), state()) -> {noreply, state()}.
123125
do_handle_info(Req, State) ->
124126
?UNEXPECTED_INFO(Req),
125127
{noreply, State}.
126128

127-
-spec handle_amqp_publish(Method :: mongoose_amqp:method(),
128-
Payload :: mongoose_amqp:message(),
129-
Opts :: worker_opts()) -> {noreply, worker_opts()}.
130-
handle_amqp_publish(Method, Payload, Opts = #{host_type := HostType,
131-
pool_tag := PoolTag}) ->
132-
PublishArgs = [Method, Payload, Opts],
133-
Res = mongoose_instrument:span(wpool_rabbit_messages_published, #{host_type => HostType, pool_tag => PoolTag},
129+
-spec handle_amqp_publish(mongoose_amqp:method(), mongoose_amqp:message(), state()) ->
130+
{noreply, state()}.
131+
handle_amqp_publish(Method, Payload, State = #{host_type := HostType, pool_tag := PoolTag}) ->
132+
PublishArgs = [Method, Payload, State],
133+
Res = mongoose_instrument:span(wpool_rabbit_messages_published,
134+
#{host_type => HostType, pool_tag => PoolTag},
134135
fun publish_message_and_wait_for_confirm/3,
135136
PublishArgs,
136137
fun(PublishTime, Result) ->
137-
handle_publish_result(PublishTime, Result, HostType, PoolTag, PublishArgs)
138+
handle_publish_result(PublishTime, Result, HostType,
139+
PoolTag, PublishArgs)
138140
end),
139141
case Res of
140142
{channel_exception, _, _} ->
141-
{FreshConn, FreshChann} = maybe_restart_rabbit_connection(Opts),
142-
{noreply, Opts#{connection := FreshConn, channel := FreshChann}};
143+
{noreply, maybe_restart_rabbit_connection(State)};
143144
_ ->
144-
{noreply, Opts}
145+
{noreply, State}
145146
end.
146147

147148
-spec handle_publish_result(integer(), publish_result(), mongooseim:host_type_or_global(),
@@ -150,103 +151,97 @@ handle_amqp_publish(Method, Payload, Opts = #{host_type := HostType,
150151
handle_publish_result(PublishTime, true, HostType, PoolTag, [Method, Payload, Opts]) ->
151152
?LOG_DEBUG(#{what => rabbit_message_sent, host_type => HostType, tag => PoolTag,
152153
method => Method, payload => Payload, opts => Opts}),
153-
#{count => 1, time => PublishTime, size => byte_size(term_to_binary(Payload)), payload => Payload};
154+
#{count => 1, time => PublishTime, size => byte_size(term_to_binary(Payload)),
155+
payload => Payload};
154156
handle_publish_result(_PublishTime, false, HostType, PoolTag, [Method, Payload, Opts]) ->
155-
?LOG_WARNING(#{what => rabbit_message_sent_failed, reason => negative_ack, host_type => HostType, tag => PoolTag,
156-
method => Method, payload => Payload, opts => Opts}),
157+
?LOG_WARNING(#{what => rabbit_message_sent_failed, reason => negative_ack,
158+
host_type => HostType, tag => PoolTag, method => Method, payload => Payload,
159+
opts => Opts}),
157160
#{failed => 1, payload => Payload};
158-
handle_publish_result(_PublishTime, {channel_exception, Error, Reason}, HostType, PoolTag, [Method, Payload, Opts]) ->
161+
handle_publish_result(_PublishTime, {channel_exception, Error, Reason}, HostType, PoolTag,
162+
[Method, Payload, Opts]) ->
159163
?LOG_ERROR(#{what => rabbit_message_sent_failed,
160164
class => Error, reason => Reason, host_type => HostType, tag => PoolTag,
161165
method => Method, payload => Payload, opts => Opts}),
162166
#{failed => 1, payload => Payload};
163167
handle_publish_result(_PublishTime, timeout, HostType, PoolTag, [Method, Payload, Opts]) ->
164-
?LOG_ERROR(#{what => rabbit_message_sent_failed, reason => timeout, host_type => HostType, tag => PoolTag,
165-
method => Method, payload => Payload, opts => Opts}),
168+
?LOG_ERROR(#{what => rabbit_message_sent_failed, reason => timeout, host_type => HostType,
169+
tag => PoolTag, method => Method, payload => Payload, opts => Opts}),
166170
#{timeout => 1, payload => Payload}.
167171

168172
-spec publish_message_and_wait_for_confirm(Method :: mongoose_amqp:method(),
169-
Payload :: mongoose_amqp:message(),
170-
worker_opts()) ->
173+
Payload :: mongoose_amqp:message(), state()) ->
171174
publish_result().
172-
publish_message_and_wait_for_confirm(Method, Payload,
173-
#{channel := Channel,
174-
confirms := IsConfirmEnabled}) ->
175+
publish_message_and_wait_for_confirm(Method, Payload, #{channel := Channel, opts := Opts}) ->
175176
try amqp_channel:call(Channel, Method, Payload) of
176177
_Res ->
177-
maybe_wait_for_confirms(Channel, IsConfirmEnabled)
178+
maybe_wait_for_confirms(Channel, Opts)
178179
catch
179180
Error:Reason -> {channel_exception, Error, Reason}
180181
end.
181182

182-
-spec maybe_wait_for_confirms(Channel :: pid(), boolean()) ->
183-
boolean() | timeout.
184-
maybe_wait_for_confirms(Channel, true) ->
183+
-spec maybe_wait_for_confirms(Channel :: pid(), opts()) -> boolean() | timeout.
184+
maybe_wait_for_confirms(Channel, #{confirms_enabled := true}) ->
185185
amqp_channel:wait_for_confirms(Channel);
186-
maybe_wait_for_confirms(_, _) -> true.
186+
maybe_wait_for_confirms(_, _) ->
187+
true.
187188

188-
-spec maybe_restart_rabbit_connection(worker_opts()) -> {pid(), pid()}.
189-
maybe_restart_rabbit_connection(#{connection := Conn, host_type := HostType,
190-
pool_tag := PoolTag,
191-
amqp_client_opts := AMQPOpts}) ->
189+
-spec maybe_restart_rabbit_connection(state()) -> state().
190+
maybe_restart_rabbit_connection(#{connection := Conn} = State) ->
192191
case is_process_alive(Conn) of
193192
true ->
194193
{ok, Channel} = amqp_connection:open_channel(Conn),
195-
{Conn, Channel};
194+
State#{channel := Channel};
196195
false ->
197-
establish_rabbit_connection(AMQPOpts, HostType, PoolTag)
196+
establish_rabbit_connection(State)
198197
end.
199198

200-
-spec establish_rabbit_connection(Opts :: mongoose_amqp:network_params(),
201-
HostType :: mongooseim:host_type_or_global(), PoolTag :: atom())
202-
-> {pid(), pid()}.
203-
establish_rabbit_connection(AMQPOpts, HostType, PoolTag) ->
204-
case amqp_connection:start(AMQPOpts) of
199+
-spec establish_rabbit_connection(state()) -> state().
200+
establish_rabbit_connection(State) ->
201+
#{opts := Opts, host_type := HostType, pool_tag := PoolTag} = State,
202+
case amqp_connection:start(mongoose_amqp:network_params(Opts)) of
205203
{ok, Connection} ->
206-
mongoose_instrument:execute(wpool_rabbit_connections, #{host_type => HostType, pool_tag => PoolTag},
204+
mongoose_instrument:execute(wpool_rabbit_connections,
205+
#{host_type => HostType, pool_tag => PoolTag},
207206
#{active => 1, opened => 1}),
208207
{ok, Channel} = amqp_connection:open_channel(Connection),
208+
maybe_enable_confirms(Channel, Opts),
209209
?LOG_DEBUG(#{what => rabbit_connection_established,
210-
host_type => HostType, pool_tag => PoolTag, opts => AMQPOpts}),
211-
{Connection, Channel};
210+
host_type => HostType, pool_tag => PoolTag, opts => Opts}),
211+
State#{connection => Connection, channel => Channel};
212212
{error, Error} ->
213-
mongoose_instrument:execute(wpool_rabbit_connections, #{host_type => HostType, pool_tag => PoolTag},
213+
mongoose_instrument:execute(wpool_rabbit_connections,
214+
#{host_type => HostType, pool_tag => PoolTag},
214215
#{failed => 1}),
215216
?LOG_ERROR(#{what => rabbit_connection_failed, reason => Error,
216-
host_type => HostType, pool_tag => PoolTag, opts => AMQPOpts}),
217+
host_type => HostType, pool_tag => PoolTag, opts => Opts}),
217218
exit("connection to a Rabbit server failed")
218219
end.
219220

220221
-spec close_rabbit_connection(Connection :: pid(), Channel :: pid(),
221222
HostType :: mongooseim:host_type_or_global(), PoolTag :: atom()) ->
222223
ok | no_return().
223224
close_rabbit_connection(Connection, Channel, HostType, PoolTag) ->
224-
mongoose_instrument:execute(wpool_rabbit_connections, #{host_type => HostType, pool_tag => PoolTag},
225+
mongoose_instrument:execute(wpool_rabbit_connections,
226+
#{host_type => HostType, pool_tag => PoolTag},
225227
#{active => -1, closed => 1}),
226228
try amqp_channel:close(Channel)
227229
catch
228230
_Error:_Reason -> already_closed
229231
end,
230232
amqp_connection:close(Connection).
231233

232-
-spec maybe_enable_confirms(Channel :: pid(), proplists:proplist()) ->
233-
boolean() | no_return().
234-
maybe_enable_confirms(Channel, Opts) ->
235-
case proplists:get_value(confirms, Opts) of
236-
true ->
237-
ConfirmCallRes = mongoose_amqp:confirm_select_ok(),
238-
ConfirmCallRes =
239-
amqp_channel:call(Channel, mongoose_amqp:confirm_select()),
240-
true;
241-
false ->
242-
false
243-
end.
234+
-spec maybe_enable_confirms(Channel :: pid(), opts()) -> ok.
235+
maybe_enable_confirms(Channel, #{confirms_enabled := true}) ->
236+
ConfirmCallRes = mongoose_amqp:confirm_select_ok(),
237+
ConfirmCallRes = amqp_channel:call(Channel, mongoose_amqp:confirm_select()),
238+
ok;
239+
maybe_enable_confirms(_Channel, #{}) ->
240+
ok.
244241

245-
-spec maybe_handle_request(Callback :: function(), Args :: [term()],
246-
Reply :: term()) -> term().
242+
-spec maybe_handle_request(Callback :: function(), Args :: [term()], Reply :: term()) -> term().
247243
maybe_handle_request(Callback, Args, Reply) ->
248-
State = lists:last(Args),
249-
Limit = maps:get(max_queue_len, State),
244+
#{opts := #{max_worker_queue_len := Limit}} = lists:last(Args),
250245
case is_msq_queue_max_limit_reached(Limit) of
251246
false ->
252247
apply(Callback, Args);
@@ -257,8 +252,7 @@ maybe_handle_request(Callback, Args, Reply) ->
257252
Reply
258253
end.
259254

260-
-spec is_msq_queue_max_limit_reached(Limit :: infinity | non_neg_integer()) ->
261-
boolean().
255+
-spec is_msq_queue_max_limit_reached(Limit :: infinity | non_neg_integer()) -> boolean().
262256
is_msq_queue_max_limit_reached(infinity) -> false;
263257
is_msq_queue_max_limit_reached(Limit) ->
264258
case process_info(self(), message_queue_len) of

src/wpool/mongoose_wpool_rabbit.erl

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,11 @@ init() ->
1212
-spec start(mongooseim:host_type_or_global(), mongoose_wpool:tag(),
1313
mongoose_wpool:pool_opts(), mongoose_wpool:conn_opts()) -> {ok, pid()} | {error, any()}.
1414
start(HostType, Tag, WpoolOptsIn, ConnOpts) ->
15-
#{confirms_enabled := Confirms, max_worker_queue_len := MaxQueueLen} = ConnOpts,
1615
PoolName = mongoose_wpool:make_pool_name(rabbit, HostType, Tag),
1716
Worker = {mongoose_rabbit_worker,
18-
[{amqp_client_opts, mongoose_amqp:network_params(ConnOpts)},
19-
{host_type, HostType},
20-
{pool_tag, Tag},
21-
{confirms, Confirms},
22-
{max_queue_len, MaxQueueLen}]},
17+
#{host_type => HostType,
18+
pool_tag => Tag,
19+
opts => ConnOpts}},
2320
WpoolOpts = [{worker, Worker}, {pool_sup_shutdown, timer:seconds(5)} | WpoolOptsIn],
2421
mongoose_wpool:start_sup_pool(rabbit, PoolName, WpoolOpts).
2522

test/mongoose_rabbit_worker_SUITE.erl

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,15 @@ end_per_suite(Config) ->
5454
Config.
5555

5656
init_per_testcase(_Case, Config) ->
57-
WorkerOpts = [{host_type, ?HOST_TYPE},
58-
{pool_tag, test_tag},
59-
{amqp_client_opts, []},
60-
{confirms, false},
61-
{max_queue_len, ?MAX_QUEUE_LEN}],
6257
mongoose_instrument:start_link(),
6358
mongoose_instrument:set_up(mongoose_wpool_rabbit:instrumentation(?HOST_TYPE, test_tag)),
59+
WorkerOpts = #{host_type => ?HOST_TYPE, pool_tag => test_tag, opts => conn_opts()},
6460
{ok, WorkerPid} = gen_server:start(mongoose_rabbit_worker, WorkerOpts, []),
6561
Config ++ [{worker_pid, WorkerPid}].
6662

63+
conn_opts() ->
64+
config_parser_helper:default_config([outgoing_pools, rabbit, test_tag, conn_opts]).
65+
6766
end_per_testcase(_Case, Config) ->
6867
mongoose_instrument:tear_down(mongoose_wpool_rabbit:instrumentation(?HOST_TYPE, test_tag)),
6968
exit(proplists:get_value(worker_pid, Config), kill),

0 commit comments

Comments
 (0)