Skip to content

Commit 2fe59f6

Browse files
committed
feat: include abcast as part of the cast batch
1 parent 9f58735 commit 2fe59f6

File tree

7 files changed

+120
-19
lines changed

7 files changed

+120
-19
lines changed

include/types.hrl

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,11 @@
1313
%% atom names saves a little bit of bandwidth.
1414
-define(CAST(M, F, A), {cast, M, F, A}).
1515
-define(ORDERED_CAST(M, F, A), {oc, M, F, A}).
16+
-define(ABCAST(N, M), {abcast, N, M}).
1617

17-
-define(IS_CAST(MSG), ((MSG) =:= cast orelse (MSG) =:= oc)).
18+
-define(IS_CAST_MSG(MSG),
19+
is_tuple(MSG) andalso
20+
(element(1, MSG) =:= abcast orelse
21+
element(1, MSG) =:= cast orelse
22+
element(1, MSG) =:= oc)
23+
).

src/gen_rpc.app.src

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,9 @@
8888
%% NOTE: this option has no effect unless 'socket_ip' is an IPv6 interface.
8989
{ipv6_only, false},
9090
%% ActiveN option for RPC acceptor socket
91-
{acceptor_socket_active_n, 100}
91+
{acceptor_socket_active_n, 100},
92+
%% Maximum size for cast messages to be sent as a batch
93+
{max_batch_size, 0}
9294
]},
9395
{modules, []}]
9496
}.

src/gen_rpc_acceptor.erl

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -136,24 +136,17 @@ waiting_for_data(info, {Driver,Socket,Data},
136136
?ORDERED_CAST(M, F, A) ->
137137
handle_cast(M, F, A, true, State),
138138
{keep_state_and_data, gen_rpc_helper:get_inactivity_timeout(?MODULE)};
139+
?ABCAST(N, M) ->
140+
handle_abcast(N, M, State),
141+
{keep_state_and_data, gen_rpc_helper:get_inactivity_timeout(?MODULE)};
139142
BatchCast when is_list(BatchCast) ->
140143
lists:foreach(fun(?CAST(M, F, A)) -> handle_cast(M, F, A, false, State);
141144
(?ORDERED_CAST(M, F, A)) -> handle_cast(M, F, A, true, State);
145+
(?ABCAST(N, M)) -> handle_abcast(N, M, State);
142146
(Invalid) -> ?tp(error, gen_rpc_invalid_batch, #{socket => gen_rpc_helper:socket_to_string(Socket), data => Invalid})
143147
end,
144148
BatchCast),
145149
{keep_state_and_data, gen_rpc_helper:get_inactivity_timeout(?MODULE)};
146-
{abcast, Name, Msg} ->
147-
_Result = case check_if_module_allowed(erlang, Control, List) of
148-
true ->
149-
?log(debug, "event=abcast_received driver=~s socket=\"~s\" peer=\"~s\" process=~s message=\"~p\"",
150-
[Driver, gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer), Name, Msg]),
151-
Msg = erlang:send(Name, Msg);
152-
false ->
153-
?log(debug, "event=request_not_allowed driver=~s socket=\"~s\" control=~s method=~s",
154-
[Driver, gen_rpc_helper:socket_to_string(Socket), Control, abcast])
155-
end,
156-
{keep_state_and_data, gen_rpc_helper:get_inactivity_timeout(?MODULE)};
157150
{sbcast, Name, Msg, Caller} ->
158151
Reply = case check_if_module_allowed(erlang, Control, List) of
159152
true ->
@@ -350,6 +343,17 @@ handle_cast(M, F, A, Ordered, #state{socket=Socket, driver=Driver, peer=Peer, co
350343
?log(debug, "event=request_not_allowed driver=~s socket=\"~s\" control=~s method=cast module=~s",[Driver, gen_rpc_helper:socket_to_string(Socket), Control, RealM])
351344
end.
352345

346+
handle_abcast(Name, Msg, #state{socket=Socket, driver=Driver, peer=Peer, control=Control, list=List}) ->
347+
case check_if_module_allowed(erlang, Control, List) of
348+
true ->
349+
?log(debug, "event=abcast_received driver=~s socket=\"~s\" peer=\"~s\" process=~s message=\"~p\"",
350+
[Driver, gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer), Name, Msg]),
351+
Msg = erlang:send({Name, node()}, Msg);
352+
false ->
353+
?log(debug, "event=request_not_allowed driver=~s socket=\"~s\" control=~s method=~s",
354+
[Driver, gen_rpc_helper:socket_to_string(Socket), Control, abcast])
355+
end.
356+
353357
exec_cast(M, F, A, _PreserveOrder = true) ->
354358
{Pid, MRef} = erlang:spawn_monitor(M, F, A),
355359
receive

src/gen_rpc_client.erl

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -352,15 +352,11 @@ handle_cast(Msg, #state{socket=Socket, driver=Driver} = State) ->
352352
{stop, {unknown_cast, Msg}, State}.
353353

354354
%% This is the actual CAST handler for CAST
355-
handle_info({{Cast, _M, _F, _A} = PacketTuple, SendTimeout}, State = #state{max_batch_size = 0}) when ?IS_CAST(Cast) ->
355+
handle_info({PacketTuple, SendTimeout}, State = #state{max_batch_size = 0}) when ?IS_CAST_MSG(PacketTuple) ->
356356
send_cast(PacketTuple, State, SendTimeout, true);
357-
handle_info({{Cast, _M, _F, _A} = PacketTuple, SendTimeout}, State = #state{max_batch_size = MaxBatchSize}) when ?IS_CAST(Cast) ->
357+
handle_info({PacketTuple, SendTimeout}, State = #state{max_batch_size = MaxBatchSize}) when ?IS_CAST_MSG(PacketTuple) ->
358358
send_cast(drain_cast(MaxBatchSize, [PacketTuple]), State, SendTimeout, true);
359359

360-
%% This is the actual CAST handler for ABCAST
361-
handle_info({{abcast,_Name,_Msg} = PacketTuple, undefined}, State) ->
362-
send_cast(PacketTuple, State, undefined, false);
363-
364360
%% This is the actual CAST handler for SBCAST
365361
handle_info({{sbcast,_Name,_Msg,_Caller} = PacketTuple, undefined}, State) ->
366362
send_cast(PacketTuple, State, undefined, true);
@@ -617,11 +613,14 @@ parse_sbcast_results([], _Ref, Results, _Timeout) ->
617613
Results.
618614

619615
drain_cast(N, CastReqs) when N =< 0 ->
616+
?tp_ignore_side_effects_in_prod(gen_rpc_cast_batch, #{ size => length(CastReqs) }),
620617
lists:reverse(CastReqs);
621618
drain_cast(N, CastReqs) ->
622619
receive
623620
{?CAST(_M,_F,_A) = Req, _} ->
624621
drain_cast(N-1, [Req | CastReqs]);
622+
{?ABCAST(_N, _M) = Req, _} ->
623+
drain_cast(N-1, [Req | CastReqs]);
625624
{?ORDERED_CAST(_M, _F, _A) = Req, _} ->
626625
drain_cast(N-1, [Req | CastReqs])
627626
after 0 ->

test/multi_rpc_SUITE.erl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,14 @@ abcast_with_bad_server(_Config) ->
197197
end,
198198
true = erlang:unregister(test_process_123).
199199

200+
abcast_with_unregistered_name(_Config) ->
201+
abcast = rpc:call(?SLAVE, gen_rpc, abcast, [[?MASTER, ?FAKE_NODE], test_process_123, this_is_a_test]),
202+
receive
203+
_ -> erlang:error(invalid_message)
204+
after
205+
2000 -> ok
206+
end.
207+
200208
sbcast(_Config) ->
201209
true = erlang:register(test_process_123, self()),
202210
{[?MASTER], []} = rpc:call(?SLAVE, gen_rpc, sbcast, [[?MASTER], test_process_123, this_is_a_test]),

test/multi_rpc_with_key_SUITE.erl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,14 @@ abcast_with_bad_server(_Config) ->
114114
end,
115115
true = erlang:unregister(test_process_123).
116116

117+
abcast_with_unregistered_name(_Config) ->
118+
abcast = rpc:call(?SLAVE, gen_rpc, abcast, [[{?MASTER,random_key}, ?FAKE_NODE], test_process_123, this_is_a_test]),
119+
receive
120+
_ -> erlang:error(invalid_message)
121+
after
122+
2000 -> ok
123+
end.
124+
117125
sbcast(_Config) ->
118126
true = erlang:register(test_process_123, self()),
119127
{[{?MASTER,random_key}], []} = rpc:call(?SLAVE, gen_rpc, sbcast, [[{?MASTER,random_key}], test_process_123, this_is_a_test]),

test/remote_SUITE.erl

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,80 @@ wrong_cookie(_Config) ->
351351
{badrpc, invalid_cookie} = gen_rpc:call(?SLAVE, os, timestamp, []),
352352
true = erlang:set_cookie(node(), OrigCookie).
353353

354+
multiple_casts_batch_test(_Config) ->
355+
ok = application:set_env(?APP, max_batch_size, 100),
356+
%% Send multiple casts and check that there are batches
357+
N = 10000,
358+
L = [integer_to_binary(L) || L <- lists:seq(12000, 12000 + N)],
359+
Last = lists:last(L),
360+
?check_trace(
361+
begin
362+
?wait_async_action( [begin
363+
?tp(test_cast, #{seqno => I}),
364+
true = gen_rpc:cast({?SLAVE, 1}, gen_rpc_test_helper, test_call, [I])
365+
end || I <- L]
366+
, #{?snk_kind := do_test_call, seqno := Last}
367+
),
368+
ok
369+
end,
370+
[ {"casts are batched",
371+
fun(Trace) ->
372+
Batches = ?of_kind(gen_rpc_cast_batch, Trace),
373+
% At least one batch must have a complete batch
374+
?assert(lists:any(fun(Batch) -> maps:get(size, Batch) == 101 end, Batches)),
375+
[begin
376+
#{ size := BatchSize } = Batch,
377+
?assert(BatchSize =< 101),
378+
?assert(BatchSize >= 1)
379+
end || Batch <- Batches],
380+
ok
381+
end}
382+
] ++ [fun gen_rpc_trace_props:all_casts_are_executed/1 | gen_rpc_trace_props:common_bundle()]).
383+
384+
multiple_abcasts_batch_test(_Config) ->
385+
ok = application:set_env(?APP, max_batch_size, 100),
386+
%% Send multiple abcasts and check that there are batches
387+
N = 10000,
388+
L = [integer_to_binary(L) || L <- lists:seq(12000, 12000 + N)],
389+
true = erlang:register(test_process_123, self()),
390+
?check_trace(
391+
begin
392+
[begin
393+
abcast = gen_rpc:abcast([node()], test_process_123, I)
394+
end || I <- L],
395+
%% Receive all expected messages
396+
ReceiveAll = fun F(0, Acc) ->
397+
Acc;
398+
F(Count, Acc) ->
399+
receive
400+
Msg -> F(Count - 1, [Msg | Acc])
401+
after 5000 ->
402+
ct:fail({timeout_waiting_for_messages,
403+
missing = Count,
404+
received = length(Acc)})
405+
end
406+
end,
407+
Received = ReceiveAll(length(L), []),
408+
409+
%% Verify we received all messages
410+
?assertEqual(length(L), length(Received)),
411+
?assertEqual(lists:sort(L), lists:sort(Received)),
412+
ok
413+
end,
414+
[ {"abcasts are batched",
415+
fun(Trace) ->
416+
Batches = ?of_kind(gen_rpc_cast_batch, Trace),
417+
% At least one batch must have a complete batch
418+
?assert(lists:any(fun(Batch) -> maps:get(size, Batch) == 101 end, Batches)),
419+
[begin
420+
#{ size := BatchSize } = Batch,
421+
?assert(BatchSize =< 101),
422+
?assert(BatchSize >= 1)
423+
end || Batch <- Batches],
424+
ok
425+
end}
426+
] ++ gen_rpc_trace_props:common_bundle()).
427+
354428
multiple_casts_test(_Config) ->
355429
%% Send multiple casts and check that they are received in order
356430
N = 10000,

0 commit comments

Comments
 (0)