From 3b252ce7abe25913e85adc7d3bfe2b129331e2b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Chrz=C4=85szcz?= Date: Mon, 8 Dec 2025 16:10:30 +0100 Subject: [PATCH 1/6] Introduce best_worker strategy with queue limit --- src/config/mongoose_config_spec.erl | 16 +++++++++++++--- src/wpool/mongoose_wpool.erl | 19 ++++++++++++++++++- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/src/config/mongoose_config_spec.erl b/src/config/mongoose_config_spec.erl index ce3d502afe..f13e55fcb8 100644 --- a/src/config/mongoose_config_spec.erl +++ b/src/config/mongoose_config_spec.erl @@ -452,7 +452,9 @@ wpool(ExtraDefaults) -> <<"strategy">> => #option{type = atom, validate = {enum, wpool_strategy_values()}}, <<"call_timeout">> => #option{type = integer, - validate = positive} + validate = positive}, + <<"queue_limit">> => #option{type = integer, + validate = positive} }, defaults = maps:merge(#{<<"workers">> => 10, <<"strategy">> => best_worker, @@ -1047,7 +1049,7 @@ check_auth_method(Method, Opts) -> process_pool([Tag, Type | _], AllOpts = #{scope := ScopeIn, connection := Connection}) -> Scope = pool_scope(ScopeIn), - Opts = maps:without([scope, host, connection], AllOpts), + Opts = verify_pool_strategy(maps:without([scope, host, connection], AllOpts)), #{type => b2a(Type), scope => Scope, tag => b2a(Tag), @@ -1058,9 +1060,17 @@ process_host_config_pool([Tag, Type, _Pools, {host, HT} | _], AllOpts = #{connec #{type => b2a(Type), scope => HT, tag => b2a(Tag), - opts => maps:remove(connection, AllOpts), + opts => verify_pool_strategy(maps:remove(connection, AllOpts)), conn_opts => Connection}. +verify_pool_strategy(Opts = #{strategy := Strategy, queue_limit := _}) + when Strategy =/= best_worker -> + error(#{what => invalid_worker_queue_limit, + text => <<"The queue_limit option can be used only with the best_worker strategy">>, + pool_options => Opts}); +verify_pool_strategy(Opts) -> + Opts. + pool_scope(host) -> host_type; pool_scope(host_type) -> host_type; pool_scope(global) -> global. diff --git a/src/wpool/mongoose_wpool.erl b/src/wpool/mongoose_wpool.erl index cadb673df2..e467b3db32 100644 --- a/src/wpool/mongoose_wpool.erl +++ b/src/wpool/mongoose_wpool.erl @@ -381,7 +381,14 @@ expand_pools(Pools, PerHostType, HostTypes) -> prepare_pool_map(Pool = #{scope := HT, opts := Opts}) -> %% Rename "scope" field to "host_type" and change wpool opts to a KV list Pool1 = maps:remove(scope, Pool), - Pool1#{host_type => HT, opts => maps:to_list(Opts)}. + Pool1#{host_type => HT, opts => maps:to_list(prepare_pool_opts(Opts))}. + +-spec prepare_pool_opts(pool_map_in()) -> pool_map_in(). +prepare_pool_opts(Opts = #{strategy := best_worker, queue_limit := Limit}) -> + Opts1 = maps:remove(queue_limit, Opts), + Opts1#{strategy := fun(Name) -> best_worker_with_limit(Name, Limit) end}; +prepare_pool_opts(Opts) -> + Opts. -spec get_unique_types([pool_map_in()], [pool_map_in()]) -> [pool_type()]. get_unique_types(Pools, HostTypeSpecific) -> @@ -404,3 +411,13 @@ instrumentation(PoolType, HostType, Tag) -> false -> [] end. + +-spec best_worker_with_limit(wpool:name(), pos_integer()) -> atom(). +best_worker_with_limit(Name, Limit) -> + Worker = wpool_pool:best_worker(Name), + case process_info(whereis(Worker), message_queue_len) of + {_, QueueLen} when QueueLen >= Limit -> + exit(no_workers); + _ -> + Worker + end. From 619b656eae99ef5063074701da11eb3757cec9cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Chrz=C4=85szcz?= Date: Tue, 9 Dec 2025 14:28:16 +0100 Subject: [PATCH 2/6] Test the queue_limit config option --- test/config_parser_SUITE.erl | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/config_parser_SUITE.erl b/test/config_parser_SUITE.erl index b506b962a0..a358290e4f 100644 --- a/test/config_parser_SUITE.erl +++ b/test/config_parser_SUITE.erl @@ -1152,9 +1152,13 @@ test_pool_opts(Type, Required) -> ?cfg(P, default_config([outgoing_pools, Type, default, opts]), T(Required)), ?cfg(P ++ [workers], 11, T(Required#{<<"workers">> => 11})), ?cfg(P ++ [strategy], random_worker, T(Required#{<<"strategy">> => <<"random_worker">>})), + ?cfg(P ++ [queue_limit], 1000, T(Required#{<<"queue_limit">> => 1000})), ?cfg(P ++ [call_timeout], 999, T(Required#{<<"call_timeout">> => 999})), ?err(T(Required#{<<"workers">> => 0})), ?err(T(Required#{<<"strategy">> => <<"worst_worker">>})), + ?err(T(Required#{<<"queue_limit">> => 0})), + ?err(T(Required#{<<"strategy">> => <<"random_worker">>, + <<"queue_limit">> => 1000})), % queue_limit is only for best_worker ?err(T(Required#{<<"call_timeout">> => 0})). test_just_tls_client(P, T) -> From f3472f16b898a43ca3bfcda3d3fc21f0eae29c84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Chrz=C4=85szcz?= Date: Mon, 8 Dec 2025 16:07:17 +0100 Subject: [PATCH 3/6] Remove msg queue limit from mongoose_rabbit_worker This functionality is now provided by the queue_limit option --- src/config/mongoose_config_spec.erl | 5 +-- src/mongoose_rabbit_worker.erl | 68 ++++++----------------------- 2 files changed, 15 insertions(+), 58 deletions(-) diff --git a/src/config/mongoose_config_spec.erl b/src/config/mongoose_config_spec.erl index f13e55fcb8..8595c11aae 100644 --- a/src/config/mongoose_config_spec.erl +++ b/src/config/mongoose_config_spec.erl @@ -546,8 +546,6 @@ outgoing_pool_connection(<<"rabbit">>) -> <<"virtual_host">> => #option{type = binary, validate = non_empty}, <<"confirms_enabled">> => #option{type = boolean}, - <<"max_worker_queue_len">> => #option{type = int_or_infinity, - validate = non_negative}, <<"tls">> => tls([client]) }, include = always, @@ -556,8 +554,7 @@ outgoing_pool_connection(<<"rabbit">>) -> <<"username">> => <<"guest">>, <<"password">> => <<"guest">>, <<"virtual_host">> => <<"/">>, - <<"confirms_enabled">> => false, - <<"max_worker_queue_len">> => 1000} + <<"confirms_enabled">> => false} }; outgoing_pool_connection(<<"rdbms">>) -> #section{ diff --git a/src/mongoose_rabbit_worker.erl b/src/mongoose_rabbit_worker.erl index d3ed2ffb12..aa308b595b 100644 --- a/src/mongoose_rabbit_worker.erl +++ b/src/mongoose_rabbit_worker.erl @@ -46,8 +46,7 @@ username := binary(), password := binary(), virtual_host := binary(), - confirms_enabled := boolean(), - max_worker_queue_len := non_neg_integer() | infinity}. + confirms_enabled := boolean()}. -type publish_result() :: boolean() | timeout | {channel_exception, any(), any()}. @@ -73,20 +72,25 @@ init(State) -> {ok, establish_rabbit_connection(State)}. -spec handle_call({amqp_call, mongoose_amqp:method()}, gen_server:from(), state()) -> - {reply, request_dropped | {ok | error | exit | throw, mongoose_amqp:method() | atom()}, - state()}. -handle_call(Req, From, State) -> - maybe_handle_request(fun do_handle_call/3, [Req, From, State], - {reply, request_dropped, State}). + {reply, {ok | error | exit | throw, mongoose_amqp:method() | atom()}, state()}. +handle_call({amqp_call, Method}, _From, State = #{channel := Channel}) -> + try amqp_channel:call(Channel, Method) of + Res -> + {reply, {ok, Res}, State} + catch + Error:Reason -> + {reply, {Error, Reason}, maybe_restart_rabbit_connection(State)} + end. -spec handle_cast({amqp_publish, mongoose_amqp:method(), mongoose_amqp:message()}, state()) -> {noreply, state()}. -handle_cast(Req, State) -> - maybe_handle_request(fun do_handle_cast/2, [Req, State], {noreply, State}). +handle_cast({amqp_publish, Method, Payload}, State) -> + handle_amqp_publish(Method, Payload, State). -spec handle_info(term(), state()) -> {noreply, state()}. handle_info(Req, State) -> - maybe_handle_request(fun do_handle_info/2, [Req, State], {noreply, State}). + ?UNEXPECTED_INFO(Req), + {noreply, State}. -spec terminate(term(), state()) -> ok. terminate(_Reason, #{connection := Connection, channel := Channel, @@ -98,27 +102,6 @@ terminate(_Reason, #{connection := Connection, channel := Channel, %%% Internal functions %%%=================================================================== --spec do_handle_call({amqp_call, mongoose_amqp:method()}, gen_server:from(), state()) -> - {reply, {ok | error | exit | throw, mongoose_amqp:method() | atom()}, state()}. -do_handle_call({amqp_call, Method}, _From, State = #{channel := Channel}) -> - try amqp_channel:call(Channel, Method) of - Res -> - {reply, {ok, Res}, State} - catch - Error:Reason -> - {reply, {Error, Reason}, maybe_restart_rabbit_connection(State)} - end. - --spec do_handle_cast({amqp_publish, mongoose_amqp:method(), mongoose_amqp:message()}, state()) -> - {noreply, state()}. -do_handle_cast({amqp_publish, Method, Payload}, State) -> - handle_amqp_publish(Method, Payload, State). - --spec do_handle_info(term(), state()) -> {noreply, state()}. -do_handle_info(Req, State) -> - ?UNEXPECTED_INFO(Req), - {noreply, State}. - -spec handle_amqp_publish(mongoose_amqp:method(), mongoose_amqp:message(), state()) -> {noreply, state()}. handle_amqp_publish(Method, Payload, State = #{host_type := HostType, pool_tag := PoolTag}) -> @@ -231,26 +214,3 @@ maybe_enable_confirms(Channel, #{confirms_enabled := true}) -> ok; maybe_enable_confirms(_Channel, #{}) -> ok. - --spec maybe_handle_request(Callback :: function(), Args :: [term()], Reply :: term()) -> term(). -maybe_handle_request(Callback, Args, Reply) -> - #{opts := #{max_worker_queue_len := Limit}} = lists:last(Args), - case is_msq_queue_max_limit_reached(Limit) of - false -> - apply(Callback, Args); - true -> - ?LOG_WARNING(#{what => rabbit_worker_request_dropped, - reason => queue_message_length_limit_reached, - limit => Limit}), - Reply - end. - --spec is_msq_queue_max_limit_reached(Limit :: infinity | non_neg_integer()) -> boolean(). -is_msq_queue_max_limit_reached(infinity) -> false; -is_msq_queue_max_limit_reached(Limit) -> - case process_info(self(), message_queue_len) of - {_, QueueLen} when QueueLen > Limit -> - true; - _Else -> - false - end. From 4f110908c6ecb42fa3cefe4a0b10cc0c719b8955 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Chrz=C4=85szcz?= Date: Tue, 9 Dec 2025 18:04:59 +0100 Subject: [PATCH 4/6] Remove max_worker_queue_len from small tests --- test/common/config_parser_helper.erl | 8 +++----- test/config_parser_SUITE.erl | 4 +--- test/config_parser_SUITE_data/outgoing_pools.toml | 2 +- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/test/common/config_parser_helper.erl b/test/common/config_parser_helper.erl index 584a1ae123..a8b20d1913 100644 --- a/test/common/config_parser_helper.erl +++ b/test/common/config_parser_helper.erl @@ -361,9 +361,8 @@ options("outgoing_pools") -> root_dn => <<"cn=admin,dc=example,dc=com">>, servers => ["ldap-server.example.com"]}}, #{type => rabbit, scope => host_type, tag => event_pusher, - opts => #{workers => 20}, - conn_opts => #{confirms_enabled => true, - max_worker_queue_len => 100}}, + opts => #{workers => 20, queue_limit => 100}, + conn_opts => #{confirms_enabled => true}}, #{type => rdbms, opts => #{workers => 5}, conn_opts => #{query_timeout => 5000, keepalive_interval => 30, @@ -827,8 +826,7 @@ default_pool_conn_opts(rabbit) -> username => <<"guest">>, password => <<"guest">>, virtual_host => <<"/">>, - confirms_enabled => false, - max_worker_queue_len => 1000}; + confirms_enabled => false}; default_pool_conn_opts(redis) -> #{host => "127.0.0.1", port => 6379, diff --git a/test/config_parser_SUITE.erl b/test/config_parser_SUITE.erl index a358290e4f..0a034c9fdd 100644 --- a/test/config_parser_SUITE.erl +++ b/test/config_parser_SUITE.erl @@ -1105,14 +1105,12 @@ pool_rabbit_connection(_Config) -> ?cfg(P ++ [password], <<"pass">>, T(#{<<"password">> => <<"pass">>})), ?cfg(P ++ [virtual_host], <<"vh">>, T(#{<<"virtual_host">> => <<"vh">>})), ?cfg(P ++ [confirms_enabled], true, T(#{<<"confirms_enabled">> => true})), - ?cfg(P ++ [max_worker_queue_len], 100, T(#{<<"max_worker_queue_len">> => 100})), ?err(T(#{<<"host">> => <<>>})), ?err(T(#{<<"port">> => 123456})), ?err(T(#{<<"username">> => <<>>})), ?err(T(#{<<"password">> => <<>>})), ?err(T(#{<<"virtual_host">> => <<>>})), - ?err(T(#{<<"confirms_enabled">> => <<"yes">>})), - ?err(T(#{<<"max_worker_queue_len">> => -1})). + ?err(T(#{<<"confirms_enabled">> => <<"yes">>})). pool_rabbit_connection_tls(_Config) -> P = [outgoing_pools, 1, conn_opts, tls], diff --git a/test/config_parser_SUITE_data/outgoing_pools.toml b/test/config_parser_SUITE_data/outgoing_pools.toml index 2c4a98d140..80eff5ccd7 100644 --- a/test/config_parser_SUITE_data/outgoing_pools.toml +++ b/test/config_parser_SUITE_data/outgoing_pools.toml @@ -47,6 +47,7 @@ [outgoing_pools.rabbit.event_pusher] scope = "host_type" workers = 20 + queue_limit = 100 [outgoing_pools.rabbit.event_pusher.connection] host = "localhost" @@ -54,7 +55,6 @@ username = "guest" password = "guest" confirms_enabled = true - max_worker_queue_len = 100 [outgoing_pools.ldap.default] scope = "host_type" From e2e6bd183a59a2522c697ca02baebc4df8fd5d4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Chrz=C4=85szcz?= Date: Tue, 9 Dec 2025 18:40:00 +0100 Subject: [PATCH 5/6] Add unit test for queue limit --- test/mongoose_wpool_SUITE.erl | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/test/mongoose_wpool_SUITE.erl b/test/mongoose_wpool_SUITE.erl index 66678d192a..4ccd797ae4 100644 --- a/test/mongoose_wpool_SUITE.erl +++ b/test/mongoose_wpool_SUITE.erl @@ -38,7 +38,8 @@ all() -> dead_pool_is_restarted, dead_pool_is_stopped_before_restarted, redis_pool_cant_be_started_with_available_worker_strategy, - cassandra_prepare_opts + cassandra_prepare_opts, + queue_limit ]. %%-------------------------------------------------------------------- @@ -188,6 +189,15 @@ global_pool_is_used_by_default(_C) -> ?assertEqual(mongoose_wpool:make_pool_name(generic, global, default), mongoose_wpool:call(generic, global, default, request)). +queue_limit(_C) -> + Pools = [#{type => generic, scope => global, tag => default, + opts => #{workers => 1, queue_limit => 1, strategy => best_worker}, conn_opts => #{}}], + StartRes = mongoose_wpool:start_configured_pools(Pools), + ?assertMatch([{ok, _}], StartRes), + ?assertEqual(ok, mongoose_wpool:cast(generic, global, default, {timer, sleep, [5000]})), + ?assertEqual(ok, mongoose_wpool:cast(generic, global, default, {timer, sleep, [1]})), + ?assertExit(no_workers, mongoose_wpool:cast(generic, global, default, {timer, sleep, [1]})). + request_behaves_as_gen_server_send_request(_C) -> Pools = [#{type => generic, scope => global, tag => default, opts => #{}, conn_opts => #{}} ], StartRes = mongoose_wpool:start_configured_pools(Pools), From 5d5203d0b25bb2d754d357288070494c626d7343 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Chrz=C4=85szcz?= Date: Tue, 9 Dec 2025 18:41:26 +0100 Subject: [PATCH 6/6] Remove obsolete unit tests for rabbit --- test/mongoose_rabbit_worker_SUITE.erl | 44 +-------------------------- 1 file changed, 1 insertion(+), 43 deletions(-) diff --git a/test/mongoose_rabbit_worker_SUITE.erl b/test/mongoose_rabbit_worker_SUITE.erl index fe9ed5f83c..f4faaabe28 100644 --- a/test/mongoose_rabbit_worker_SUITE.erl +++ b/test/mongoose_rabbit_worker_SUITE.erl @@ -33,9 +33,7 @@ all() -> [ no_request_in_worker_queue_is_lost_when_amqp_call_fails, - worker_creates_fresh_amqp_conection_and_channel_when_amqp_call_fails, - worker_processes_msgs_when_queue_msg_len_limit_is_not_reached, - worker_drops_msgs_when_queue_msg_len_limit_is_reached + worker_creates_fresh_amqp_conection_and_channel_when_amqp_call_fails ]. %%-------------------------------------------------------------------- @@ -125,46 +123,6 @@ worker_creates_fresh_amqp_conection_and_channel_when_amqp_call_fails(Config) -> ?assert(is_pid(Channel3)), ?assertNotMatch(ConnectionAndChannel2, ConnectionAndChannel3). - -worker_processes_msgs_when_queue_msg_len_limit_is_not_reached(Config) -> - %% given - Worker = proplists:get_value(worker_pid, Config), - Ref = make_ref(), - Lock = lock_fun(), - SendBack = send_back_fun(), - - %% when - gen_server:cast(Worker, {amqp_publish, {Lock, [Ref]}, ok}), - gen_server:cast(Worker, {amqp_publish, {SendBack, [self(), Ref]}, ok}), - [gen_server:cast(Worker, {amqp_publish, ok, ok}) - || _ <- lists:seq(1, ?MAX_QUEUE_LEN-1)], - - %% unlock the worker - Worker ! Ref, - - %% then - ?assertReceivedMatch(Ref, 100). - -worker_drops_msgs_when_queue_msg_len_limit_is_reached(Config) -> - %% given - Worker = proplists:get_value(worker_pid, Config), - Ref = make_ref(), - Lock = lock_fun(), - SendBack = send_back_fun(), - - %% when - gen_server:cast(Worker, {amqp_publish, {Lock, [Ref]}, ok}), - gen_server:cast(Worker, {amqp_publish, {SendBack, [self(), Ref]}, ok}), - [gen_server:cast(Worker, {amqp_publish, ok, ok}) - || _ <- lists:seq(1, ?MAX_QUEUE_LEN+1)], - - %% unlock the worker - Worker ! Ref, - - %% then - ?assertError({assertReceivedMatch_failed, _}, - ?assertReceivedMatch(Ref, 100)). - %%-------------------------------------------------------------------- %% Helpers %%--------------------------------------------------------------------