Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@
queries_waiting_heartbeats := queue:queue({non_neg_integer(),
consistent_query_ref()}),
pending_consistent_queries := [consistent_query_ref()],
commit_latency => option(non_neg_integer())
commit_latency => option(non_neg_integer()),
meta_fd => option(file:fd())
}.

-type state() :: ra_server_state().
Expand Down Expand Up @@ -349,9 +350,11 @@ init(#{id := Id,
end,

MetaName = meta_name(SystemConfig),
CurrentTerm = ra_log_meta:fetch(MetaName, UId, current_term, 0),
LastApplied = ra_log_meta:fetch(MetaName, UId, last_applied, 0),
VotedFor = ra_log_meta:fetch(MetaName, UId, voted_for, undefined),
DataDir = maps:get(data_dir, SystemConfig),
MetaFile = ra_server_meta:path(DataDir, UId),

{ok, {VotedFor, CurrentTerm, LastApplied}} = ra_server_meta:fetch(MetaFile, MetaName, UId),
{ok, MetaFd} = file:open(MetaFile, [read, write, raw, binary]),

LatestMacVer = ra_machine:version(Machine),
InitialMachineVersion = min(LatestMacVer,
Expand Down Expand Up @@ -420,7 +423,8 @@ init(#{id := Id,
aux_state => ra_machine:init_aux(MacMod, Name),
query_index => 0,
queries_waiting_heartbeats => queue:new(),
pending_consistent_queries => []}.
pending_consistent_queries => [],
meta_fd => MetaFd}.

recover(#{cfg := #cfg{log_id = LogId,
machine_version = MacVer,
Expand Down Expand Up @@ -2238,8 +2242,8 @@ persist_last_applied(#{persisted_last_applied := PLA,
% if last applied is less than PL for some reason do nothing
State;
persist_last_applied(#{last_applied := LastApplied,
cfg := #cfg{uid = UId} = Cfg} = State) ->
ok = ra_log_meta:store(meta_name(Cfg), UId, last_applied, LastApplied),
meta_fd := MetaFd} = State) ->
ok = ra_server_meta:update_last_applied(MetaFd, LastApplied),
State#{persisted_last_applied => LastApplied}.


Expand Down Expand Up @@ -2355,13 +2359,17 @@ handle_node_status(RaftState, Type, Node, Status, _Info,

-spec terminate(ra_server_state(), Reason :: {shutdown, delete} | term()) -> ok.
terminate(#{log := Log,
meta_fd := MetaFd,
cfg := #cfg{log_id = LogId}} = _State, {shutdown, delete}) ->
?NOTICE("~ts: terminating with reason 'delete'", [LogId]),
_ = file:close(MetaFd),
catch ra_log:delete_everything(Log),
ok;
terminate(#{cfg := #cfg{log_id = LogId}} = State, Reason) ->
terminate(#{cfg := #cfg{log_id = LogId}, meta_fd := MetaFd} = State, Reason) ->
?DEBUG("~ts: terminating with reason '~w'", [LogId, Reason]),
#{log := Log} = persist_last_applied(State),
_ = file:sync(MetaFd),
_ = file:close(MetaFd),
catch ra_log:close(Log),
ok.

Expand Down Expand Up @@ -2563,18 +2571,17 @@ peer(PeerId, #{cluster := Nodes}) ->
put_peer(PeerId, Peer, #{cluster := Peers} = State) ->
State#{cluster => Peers#{PeerId => Peer}}.

update_term_and_voted_for(Term, VotedFor, #{cfg := #cfg{uid = UId} = Cfg,
current_term := CurTerm} = State) ->
update_term_and_voted_for(Term, VotedFor, #{cfg := Cfg,
current_term := CurTerm,
meta_fd := MetaFd} = State) ->
CurVotedFor = maps:get(voted_for, State, undefined),
case Term =:= CurTerm andalso VotedFor =:= CurVotedFor of
true ->
%% no update needed
State;
false ->
MetaName = meta_name(Cfg),
%% as this is a rare event it is ok to go sync here
ok = ra_log_meta:store(MetaName, UId, current_term, Term),
ok = ra_log_meta:store_sync(MetaName, UId, voted_for, VotedFor),
LastApplied = maps:get(last_applied, State, 0),
ok = ra_server_meta:store_sync(MetaFd, VotedFor, Term, LastApplied),
incr_counter(Cfg, ?C_RA_SRV_TERM_AND_VOTED_FOR_UPDATES, 1),
put_counter(Cfg, ?C_RA_SVR_METRIC_TERM, Term),
%% this is probably not necessary
Expand Down Expand Up @@ -3413,8 +3420,6 @@ put_counter(#cfg{counter = Cnt}, Ix, N) when Cnt =/= undefined ->
put_counter(#cfg{counter = undefined}, _Ix, _N) ->
ok.

meta_name(#cfg{system_config = #{names := #{log_meta := Name}}}) ->
Name;
meta_name(#{names := #{log_meta := Name}}) ->
Name.

Expand Down
211 changes: 211 additions & 0 deletions src/ra_server_meta.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries.
-module(ra_server_meta).

-include_lib("stdlib/include/assert.hrl").

-export([
path/2,
fetch/3,
fetch_from_file/1,
store_sync/4,
update_last_applied/2
]).

%% This module implements persistance for server metadata
%% Before Ra 3.0, metadata was stored in a DETS file, shared
%% by all Ra servers in a Ra system.
%% Now, we store each server's metadata in a separate file,
%% in the server's data directory.
%% The structure of the metadata file is as follows:
%% - 4 bytes magic header (RAM1)
%% - 1004 bytes VotedFor field, which is a binary
%% - 1 byte for the size of the first atom (server name)
%% - first atom (server name) as a binary
%% - 1 byte for the size of the second atom (node name)
%% - second atom (node name) as a binary
%% - padding (zeroed)
%% - 8 bytes CurrentTerm (unsigned 64-bit integer)
%% - 8 bytes LastApplied (unsigned 64-bit integer)
%% for a total of 1024 bytes
%%
%% When VotedFor/Term change, the file is updated and fsynced.
%% If only the LastApplied changes, we update but do not fsync,
%% since this would be prohibitively slow.

-define(FILENAME, "server.meta").
-define(MAGIC, "RAM1").
-define(TOTAL_SIZE, 1024).
-define(LAST_APPLIED_POSITION, ?TOTAL_SIZE - 8).
-define(TERM_POSITION, ?TOTAL_SIZE - ?LAST_APPLIED_POSITION - 8).

path(DataDir, UId) ->
ServerDir = filename:join(DataDir, UId),
filename:join(ServerDir, ?FILENAME).

fetch(Path, MetaName, UId) ->
case fetch_from_file(Path) of
{ok, Metadata} when is_tuple(Metadata) ->
{ok, Metadata};
{error, _} ->
%% metadata migration case:
%% fetch from ra_log_meta and store in a file
{VotedFor, CurrentTerm, LastApplied} = fetch_from_ra_log_meta(MetaName, UId),
case store_sync(Path, VotedFor, CurrentTerm, LastApplied) of
ok ->
ra_log_meta:delete(MetaName, UId),
{ok, {VotedFor, CurrentTerm, LastApplied}};
Err ->
Err
end
end.

fetch_from_file(Path) ->
case file:read_file(Path) of
{ok, <<?MAGIC, VotedForBin:1004/binary, CurrentTerm:64/unsigned, LastApplied:64/unsigned>>} ->
VotedFor = try
parse_voted_for(VotedForBin)
catch
_:_ -> undefined
end,
{ok, {VotedFor, CurrentTerm, LastApplied}};
{ok, _} ->
{error, invalid_format};
Err ->
Err
end.

fetch_from_ra_log_meta(MetaName, UId) ->
VotedFor = ra_log_meta:fetch(MetaName, UId, voted_for, undefined),
CurrentTerm = ra_log_meta:fetch(MetaName, UId, current_term, 0),
LastApplied = ra_log_meta:fetch(MetaName, UId, last_applied, 0),
{VotedFor, CurrentTerm, LastApplied}.

store_sync(MetaFile, VotedFor, CurrentTerm, LastApplied) when is_binary(MetaFile) ->
{ok, MetaFd} = file:open(MetaFile, [write, binary, raw]),
store_sync(MetaFd, VotedFor, CurrentTerm, LastApplied),
file:close(MetaFd);
store_sync(MetaFd, VotedFor, CurrentTerm, LastApplied) ->
Data = encode_metadata(VotedFor, CurrentTerm, LastApplied),
ok = file:pwrite(MetaFd, 0, Data),
ok = file:sync(MetaFd).

update_last_applied(MetaFd, LastApplied) ->
ok = file:pwrite(MetaFd, ?LAST_APPLIED_POSITION, <<LastApplied:64>>).

encode_metadata(VotedFor, CurrentTerm, LastApplied) ->
VotedForBin = case VotedFor of
undefined ->
<<0, 0>>;
{NameAtom, NodeAtom} ->
NameAtomBin = atom_to_binary(NameAtom, utf8),
NodeAtomBin = atom_to_binary(NodeAtom, utf8),
NameSize = byte_size(NameAtomBin),
NodeSize = byte_size(NodeAtomBin),
<<NameSize:8/unsigned, NameAtomBin/binary,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just noting that although an atom can only contain max 255 (possibly unicode) characters, the binary representation might require more than 255 bytes

1> Atom = list_to_atom(lists:duplicate(255, 256)).
'ĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀĀ'
2> byte_size(atom_to_binary(Atom)).
510

term_to_binary stores the atom size in 1 or 2 bytes https://www.erlang.org/doc/apps/erts/erl_ext_dist.html#atom_utf8_ext

sorry if this was considered already, and storing the size in 1 byte is intentional

Copy link
Contributor

@kjnilsson kjnilsson Jun 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was not considered.

I thought it would still limit utf8 atoms to 255 bytes. This is slighlty awekward as we'd theoretically need 4096 bytes just for the two atoms which would then take us over the standard block size.

We could consider compressing but we'd still need to ensure we're below 4000 or so bytes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry it's late and I clearly cant math. We'd just need 2052 bytes to encode the atoms so we're still well below the block size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for pointing this out. I've modified the format and code to use 2 bytes for atom sizes. Technically, if both atoms were full of UTF-8, we could still fail, but what are the odds?! :) Based on our testing, node names can't be UTF-8 (and the would need to be full of them for this to be a problem)

NodeSize:8/unsigned, NodeAtomBin/binary>>
end,

HeaderSize = length(?MAGIC),
VotedForSize = byte_size(VotedForBin),
UsedSize = HeaderSize + VotedForSize,
PaddingSize = 1008 - UsedSize,
Padding = <<0:PaddingSize/unit:8>>,

<<?MAGIC, VotedForBin/binary, Padding/binary,
CurrentTerm:64/unsigned, LastApplied:64/unsigned>>.

parse_voted_for(<<NameAtomSize:8/unsigned, Rest/binary>>) when NameAtomSize > 0 ->
case Rest of
<<NameAtom:NameAtomSize/binary, NodeAtomSize:8/unsigned, NodeAtom:NodeAtomSize/binary, _/binary>>
when NodeAtomSize > 0 ->
{binary_to_atom(NameAtom, utf8), binary_to_atom(NodeAtom, utf8)};
_ ->
undefined
end;
parse_voted_for(_) ->
undefined.

%%% ===================
%%% Internal unit tests
%%% ===================

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

v1_format_test() ->
CurrentTerm = rand:uniform(10000),
LastApplied = rand:uniform(100000),
VotedFor = {somename, somenode},

% we always encode into a 1024-byte binary
Data = encode_metadata(VotedFor, CurrentTerm, LastApplied),
?assertEqual(1024, byte_size(Data)),

% we can reconstruct the VotedFor from the binary
<<"RAM1", VotedForBin/binary>> = Data,
?assertEqual({somename, somenode}, parse_voted_for(VotedForBin)),

% we can extract term and last applied from fixed positions
<<_:1008/binary, ParsedTerm:64/unsigned, ParsedLastApplied:64/unsigned>> = Data,
?assertEqual(CurrentTerm, ParsedTerm),
?assertEqual(LastApplied, ParsedLastApplied),

% "empty" metadata
EmptyData = encode_metadata(undefined, 0, 0),
?assertEqual(1024, byte_size(EmptyData)),
<<"RAM1", VotedForDataUndef/binary>> = EmptyData,
?assertEqual(undefined, parse_voted_for(VotedForDataUndef)),
<<_:1008/binary, ZeroTerm:64/unsigned, ZeroLastApplied:64/unsigned>> = EmptyData,
?assertEqual(ZeroTerm, 0),
?assertEqual(ZeroLastApplied, 0),

% end-to-end test
TempFile = "test_new_meta", %% TODO - put in the right place
file:write_file(TempFile, Data),
{ok, {E2EVotedFor, E2ECurrentTerm, E2ELastApplied}} = fetch_from_file(TempFile),
file:delete(TempFile),
?assertEqual(VotedFor, E2EVotedFor),
?assertEqual(CurrentTerm, E2ECurrentTerm),
?assertEqual(LastApplied, E2ELastApplied),

% Test edge cases

% very long atom names
LongName = list_to_atom([$a || _ <- lists:seq(1, 255)]),
LongNode = list_to_atom([$b || _ <- lists:seq(1, 255)]),
LongVotedFor = {LongName, LongNode},
DataLong = encode_metadata(LongVotedFor, 999999, 888888),
?assertEqual(1024, byte_size(DataLong)),
<<"RAM1", VotedForDataLong/binary>> = DataLong,
?assertEqual(LongVotedFor, parse_voted_for(VotedForDataLong)),

% single character atoms
ShortVotedFor = {a, b},
DataShort = encode_metadata(ShortVotedFor, 1, 2),
?assertEqual(1024, byte_size(DataShort)),
<<"RAM1", VotedForDataShort/binary>> = DataShort,
?assertEqual(ShortVotedFor, parse_voted_for(VotedForDataShort)),

% max values are handled
MaxTerm = 18446744073709551615, % 2^64 - 1
MaxApplied = 18446744073709551615,
DataMax = encode_metadata(VotedFor, MaxTerm, MaxApplied),
?assertEqual(1024, byte_size(DataMax)),
<<_:1008/binary, ParsedMaxTerm:64/unsigned, ParsedMaxApplied:64/unsigned>> = DataMax,
?assertEqual(MaxTerm, ParsedMaxTerm),
?assertEqual(MaxApplied, ParsedMaxApplied),

% invalid magic header
BadHeaderData = <<"ACME", VotedForBin/binary>>,
TempFileBadHeader = "test_bad_header", %% TODO path
file:write_file(TempFileBadHeader, BadHeaderData),
?assertEqual({error, invalid_format}, fetch_from_file(TempFileBadHeader)),
file:delete(TempFileBadHeader),

ok.

-endif.
18 changes: 18 additions & 0 deletions test/ra_server_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,22 @@ setup_log() ->
fun(_Data, _OutOf, _Flag, SS) ->
{ok, SS}
end),
meck:expect(ra_server_meta, path, fun(_, U) -> U end),
meck:expect(ra_server_meta, fetch, fun(P, _, _) ->
case get(P) of
undefined ->
{ok, {undefined, 0, 0}};
Metadata ->
{ok, Metadata}
end
end),
meck:expect(ra_server_meta, store_sync, fun (P, V, T, L) ->
put(P, {V, T, L}), ok
end),
meck:expect(ra_server_meta, update_last_applied, fun (P, L) ->
{V, T, _} = get(P),
put(P, {V, T, L}), ok
end),
meck:expect(ra_snapshot, abort_accept, fun(SS) -> SS end),
meck:expect(ra_snapshot, accepting, fun(_SS) -> undefined end),
meck:expect(ra_log, snapshot_state, fun ra_log_memory:snapshot_state/1),
Expand Down Expand Up @@ -215,6 +231,7 @@ init_test(_Config) ->
ok = ra_log_meta:store(ra_log_meta, UId, voted_for, some_server),
ok = ra_log_meta:store(ra_log_meta, UId, current_term, CurrentTerm),
meck:expect(ra_log, init, fun (_) -> Log0 end),
meck:expect(ra_server_meta, fetch, fun(_, _, _) -> {ok, {some_server, 5, 0}} end),
#{current_term := 5,
voted_for := some_server} = ra_server_init(InitConf),
% snapshot
Expand Down Expand Up @@ -3175,6 +3192,7 @@ base_state(NumServers, MacMod) ->
},
#{cfg => Cfg,
leader_id => ?N1,
meta_fd => fake_fd,
cluster => Servers,
cluster_index_term => {0, 0},
cluster_change_permitted => true,
Expand Down