Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion include/types.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,11 @@
%% atom names saves a little bit of bandwidth.
-define(CAST(M, F, A), {cast, M, F, A}).
-define(ORDERED_CAST(M, F, A), {oc, M, F, A}).
-define(ABCAST(N, M), {abcast, N, M}).

-define(IS_CAST(MSG), ((MSG) =:= cast orelse (MSG) =:= oc)).
-define(IS_CAST_MSG(MSG),
is_tuple(MSG) andalso
(element(1, MSG) =:= abcast orelse
element(1, MSG) =:= cast orelse
element(1, MSG) =:= oc)
).
4 changes: 3 additions & 1 deletion src/gen_rpc.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@
%% NOTE: this option has no effect unless 'socket_ip' is an IPv6 interface.
{ipv6_only, false},
%% ActiveN option for RPC acceptor socket
{acceptor_socket_active_n, 100}
{acceptor_socket_active_n, 100},
%% Maximum size for cast messages to be sent as a batch
{max_batch_size, 0}
]},
{modules, []}]
}.
28 changes: 16 additions & 12 deletions src/gen_rpc_acceptor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -142,25 +142,17 @@ waiting_for_data(info, {Driver,Socket,Data},
?ORDERED_CAST(M, F, A) ->
handle_cast(M, F, A, true, State),
{keep_state_and_data, gen_rpc_helper:get_inactivity_timeout(?MODULE)};
?ABCAST(N, M) ->
handle_abcast(N, M, State),
{keep_state_and_data, gen_rpc_helper:get_inactivity_timeout(?MODULE)};
BatchCast when is_list(BatchCast) ->
lists:foreach(fun(?CAST(M, F, A)) -> handle_cast(M, F, A, false, State);
(?ORDERED_CAST(M, F, A)) -> handle_cast(M, F, A, true, State);
(?ABCAST(N, M)) -> handle_abcast(N, M, State);
(Invalid) -> ?tp(error, gen_rpc_invalid_batch, #{socket => gen_rpc_helper:socket_to_string(Socket), data => Invalid, domain => ?D_ACCEPTOR})
end,
BatchCast),
{keep_state_and_data, gen_rpc_helper:get_inactivity_timeout(?MODULE)};
{abcast, Name, Msg} ->
_Result = case check_if_module_allowed(erlang, Control, List) of
true ->
erlang:send(Name, Msg);
false ->
?log(debug, "request_not_allowed",
#{driver => Driver,
socket => gen_rpc_helper:socket_to_string(Socket),
control => Control,
method => abcast})
end,
{keep_state_and_data, gen_rpc_helper:get_inactivity_timeout(?MODULE)};
{sbcast, Name, Msg, Caller} ->
Reply = case check_if_module_allowed(erlang, Control, List) of
true ->
Expand Down Expand Up @@ -371,6 +363,18 @@ handle_cast(M, F, A, Ordered, #state{socket=Socket, driver=Driver, peer=_Peer, c
})
end.

handle_abcast(Name, Msg, #state{socket=Socket, driver=Driver, control=Control, list=List}) ->
case check_if_module_allowed(erlang, Control, List) of
true ->
Msg = erlang:send({Name, node()}, Msg);
false ->
?log(debug, "request_not_allowed",
#{driver => Driver,
socket => gen_rpc_helper:socket_to_string(Socket),
control => Control,
method => abcast})
end.

exec_cast(M, F, A, _PreserveOrder = true) ->
{Pid, MRef} = erlang:spawn_monitor(M, F, A),
receive
Expand Down
11 changes: 5 additions & 6 deletions src/gen_rpc_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -372,15 +372,11 @@ handle_cast(Msg, #state{socket=Socket, driver=Driver} = State) ->
{stop, {unknown_cast, Msg}, State}.

%% This is the actual CAST handler for CAST
handle_info({{Cast, _M, _F, _A} = PacketTuple, SendTimeout}, State = #state{max_batch_size = 0}) when ?IS_CAST(Cast) ->
handle_info({PacketTuple, SendTimeout}, State = #state{max_batch_size = 0}) when ?IS_CAST_MSG(PacketTuple) ->
send_cast(PacketTuple, State, SendTimeout, true);
handle_info({{Cast, _M, _F, _A} = PacketTuple, SendTimeout}, State = #state{max_batch_size = MaxBatchSize}) when ?IS_CAST(Cast) ->
handle_info({PacketTuple, SendTimeout}, State = #state{max_batch_size = MaxBatchSize}) when ?IS_CAST_MSG(PacketTuple) ->
send_cast(drain_cast(MaxBatchSize, [PacketTuple]), State, SendTimeout, true);

%% This is the actual CAST handler for ABCAST
handle_info({{abcast,_Name,_Msg} = PacketTuple, undefined}, State) ->
send_cast(PacketTuple, State, undefined, false);

%% This is the actual CAST handler for SBCAST
handle_info({{sbcast,_Name,_Msg,_Caller} = PacketTuple, undefined}, State) ->
send_cast(PacketTuple, State, undefined, true);
Expand Down Expand Up @@ -648,11 +644,14 @@ parse_sbcast_results([], _Ref, Results, _Timeout) ->
Results.

drain_cast(N, CastReqs) when N =< 0 ->
?tp_ignore_side_effects_in_prod(gen_rpc_cast_batch, #{ size => length(CastReqs) }),
lists:reverse(CastReqs);
drain_cast(N, CastReqs) ->
receive
{?CAST(_M,_F,_A) = Req, _} ->
drain_cast(N-1, [Req | CastReqs]);
{?ABCAST(_N, _M) = Req, _} ->
drain_cast(N-1, [Req | CastReqs]);
{?ORDERED_CAST(_M, _F, _A) = Req, _} ->
drain_cast(N-1, [Req | CastReqs])
after 0 ->
Expand Down
8 changes: 8 additions & 0 deletions test/multi_rpc_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,14 @@ abcast_with_bad_server(_Config) ->
end,
true = erlang:unregister(test_process_123).

abcast_with_unregistered_name(_Config) ->
abcast = rpc:call(?SLAVE, gen_rpc, abcast, [[?MASTER, ?FAKE_NODE], test_process_123, this_is_a_test]),
receive
_ -> erlang:error(invalid_message)
after
2000 -> ok
end.

sbcast(_Config) ->
true = erlang:register(test_process_123, self()),
{[?MASTER], []} = rpc:call(?SLAVE, gen_rpc, sbcast, [[?MASTER], test_process_123, this_is_a_test]),
Expand Down
8 changes: 8 additions & 0 deletions test/multi_rpc_with_key_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ abcast_with_bad_server(_Config) ->
end,
true = erlang:unregister(test_process_123).

abcast_with_unregistered_name(_Config) ->
abcast = rpc:call(?SLAVE, gen_rpc, abcast, [[{?MASTER,random_key}, ?FAKE_NODE], test_process_123, this_is_a_test]),
receive
_ -> erlang:error(invalid_message)
after
2000 -> ok
end.

sbcast(_Config) ->
true = erlang:register(test_process_123, self()),
{[{?MASTER,random_key}], []} = rpc:call(?SLAVE, gen_rpc, sbcast, [[{?MASTER,random_key}], test_process_123, this_is_a_test]),
Expand Down
74 changes: 74 additions & 0 deletions test/remote_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,80 @@ wrong_cookie(_Config) ->
{badrpc, invalid_cookie} = gen_rpc:call(?SLAVE, os, timestamp, []),
true = erlang:set_cookie(node(), OrigCookie).

multiple_casts_batch_test(_Config) ->
ok = application:set_env(?APP, max_batch_size, 100),
%% Send multiple casts and check that there are batches
N = 10000,
L = [integer_to_binary(L) || L <- lists:seq(12000, 12000 + N)],
Last = lists:last(L),
?check_trace(
begin
?wait_async_action( [begin
?tp(test_cast, #{seqno => I}),
true = gen_rpc:cast({?SLAVE, 1}, gen_rpc_test_helper, test_call, [I])
end || I <- L]
, #{?snk_kind := do_test_call, seqno := Last}
),
ok
end,
[ {"casts are batched",
fun(Trace) ->
Batches = ?of_kind(gen_rpc_cast_batch, Trace),
% At least one batch must have a complete batch
?assert(lists:any(fun(Batch) -> maps:get(size, Batch) == 101 end, Batches)),
[begin
#{ size := BatchSize } = Batch,
?assert(BatchSize =< 101),
?assert(BatchSize >= 1)
end || Batch <- Batches],
ok
end}
] ++ [fun gen_rpc_trace_props:all_casts_are_executed/1 | gen_rpc_trace_props:common_bundle()]).

multiple_abcasts_batch_test(_Config) ->
ok = application:set_env(?APP, max_batch_size, 100),
%% Send multiple abcasts and check that there are batches
N = 10000,
L = [integer_to_binary(L) || L <- lists:seq(12000, 12000 + N)],
true = erlang:register(test_process_123, self()),
?check_trace(
begin
[begin
abcast = gen_rpc:abcast([node()], test_process_123, I)
end || I <- L],
%% Receive all expected messages
ReceiveAll = fun F(0, Acc) ->
Acc;
F(Count, Acc) ->
receive
Msg -> F(Count - 1, [Msg | Acc])
after 5000 ->
ct:fail({timeout_waiting_for_messages,
missing = Count,
received = length(Acc)})
end
end,
Received = ReceiveAll(length(L), []),

%% Verify we received all messages
?assertEqual(length(L), length(Received)),
?assertEqual(lists:sort(L), lists:sort(Received)),
ok
end,
[ {"abcasts are batched",
fun(Trace) ->
Batches = ?of_kind(gen_rpc_cast_batch, Trace),
% At least one batch must have a complete batch
?assert(lists:any(fun(Batch) -> maps:get(size, Batch) == 101 end, Batches)),
[begin
#{ size := BatchSize } = Batch,
?assert(BatchSize =< 101),
?assert(BatchSize >= 1)
end || Batch <- Batches],
ok
end}
] ++ gen_rpc_trace_props:common_bundle()).

multiple_casts_test(_Config) ->
%% Send multiple casts and check that they are received in order
N = 10000,
Expand Down
Loading