Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ jobs:
needs: build
runs-on: self-hosted
strategy:
fail-fast: true
fail-fast: false
max-parallel: 12
matrix:
core_test_mod: [
## Long-running tests. Put these first to limit the overall runtime of the
## Long-running tests. Put these first to limit the overall runtime of the
## test suite
ar_coordinated_mining_tests,
ar_data_sync_tests,
Expand Down
4 changes: 3 additions & 1 deletion apps/arweave/include/ar_data_sync.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -211,5 +211,7 @@
%% fragmentation.
store_chunk_queue_threshold = ?STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD,
%% Cache mapping peers to /data_sync_record responses
all_peers_intervals = #{}
all_peers_intervals = #{},
%% List of local peers used to check if we need to skip block verification.
local_peers = []
}).
40 changes: 29 additions & 11 deletions apps/arweave/src/ar_data_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,8 @@ init({"default" = StoreID, _}) ->
weave_size = maps:get(weave_size, StateMap),
disk_pool_cursor = first,
disk_pool_threshold = DiskPoolThreshold,
store_id = StoreID
store_id = StoreID,
local_peers = Config#config.local_peers
},
timer:apply_interval(?REMOVE_EXPIRED_DATA_ROOTS_FREQUENCY_MS, ?MODULE,
remove_expired_disk_pool_data_roots, []),
Expand Down Expand Up @@ -691,18 +692,20 @@ init({StoreID, RepackInPlacePacking}) ->
process_flag(trap_exit, true),
[ok, ok] = ar_events:subscribe([node_state, disksup]),
State = init_kv(StoreID),
{ok, Config} = application:get_env(arweave, config),
State2 = State#sync_data_state{local_peers = Config#config.local_peers},
case RepackInPlacePacking of
none ->
gen_server:cast(self(), process_store_chunk_queue),
{RangeStart, RangeEnd} = ar_storage_module:get_range(StoreID),
State2 = State#sync_data_state{
State3 = State2#sync_data_state{
store_id = StoreID,
range_start = RangeStart,
range_end = RangeEnd
},
{ok, may_be_start_syncing(State2)};
{ok, may_be_start_syncing(State3)};
_ ->
{ok, State}
{ok, State2}
end.

handle_cast({move_data_root_index, Cursor, N}, State) ->
Expand Down Expand Up @@ -1087,6 +1090,7 @@ handle_cast({store_fetched_chunk, Peer, Byte, Proof} = Cast, State) ->
Offset = SeekByte - BlockStartOffset,
ValidateDataPathRuleset = ar_poa:get_data_path_validation_ruleset(BlockStartOffset,
get_merkle_rebase_threshold()),
IsLocalPeer = lists:member(Peer, State#sync_data_state.local_peers),
case validate_proof(TXRoot, BlockStartOffset, Offset, BlockSize, Proof,
ValidateDataPathRuleset) of
{need_unpacking, AbsoluteOffset, ChunkArgs, VArgs} ->
Expand All @@ -1098,6 +1102,8 @@ handle_cast({store_fetched_chunk, Peer, Byte, Proof} = Cast, State) ->
true ->
decrement_chunk_cache_size(),
{noreply, State};
false when IsLocalPeer ->
process_valid_fetched_chunk(ChunkArgs, Args, State);
false ->
case ar_packing_server:is_buffer_full() of
true ->
Expand All @@ -1115,8 +1121,9 @@ handle_cast({store_fetched_chunk, Peer, Byte, Proof} = Cast, State) ->
{AbsoluteOffset, unpacked}}),
{noreply, State#sync_data_state{
packing_map = PackingMap#{
{AbsoluteOffset, unpacked} => {unpack_fetched_chunk,
Args} } }}
{AbsoluteOffset, unpacked} => {unpack_fetched_chunk, Args}
}
}}
end
end;
false ->
Expand Down Expand Up @@ -1877,8 +1884,11 @@ get_tx_offset_data_in_range2(TXOffsetIndex, TXIndex, Start, End) ->
get_tx_data(Start, End, Chunks) when Start >= End ->
{ok, iolist_to_binary(Chunks)};
get_tx_data(Start, End, Chunks) ->
case get_chunk(Start + 1, #{ pack => true, packing => unpacked,
bucket_based_offset => false }) of
case get_chunk(Start + 1, #{
pack => true,
packing => unpacked,
bucket_based_offset => false
}) of
{ok, #{ chunk := Chunk }} ->
get_tx_data(Start + byte_size(Chunk), End, [Chunks | Chunk]);
{error, chunk_not_found} ->
Expand Down Expand Up @@ -2829,7 +2839,7 @@ process_invalid_fetched_chunk(Peer, Byte, State) ->

process_valid_fetched_chunk(ChunkArgs, Args, State) ->
#sync_data_state{ store_id = StoreID } = State,
{Packing, UnpackedChunk, AbsoluteEndOffset, TXRoot, ChunkSize} = ChunkArgs,
{FetchedPacking, FetchedChunk, AbsoluteEndOffset, TXRoot, ChunkSize} = ChunkArgs,
{AbsoluteTXStartOffset, TXSize, DataPath, TXPath, DataRoot, Chunk, _ChunkID,
ChunkEndOffset, Peer, Byte} = Args,
case is_chunk_proof_ratio_attractive(ChunkSize, TXSize, DataPath) of
Expand All @@ -2845,11 +2855,19 @@ process_valid_fetched_chunk(ChunkArgs, Args, State) ->
%% The chunk has been synced by another job already.
decrement_chunk_cache_size(),
{noreply, State};
false when FetchedPacking =/= unpacked ->
%% we don't have unpacked chunk, so possible repack is needed
true = AbsoluteEndOffset == AbsoluteTXStartOffset + ChunkEndOffset,
pack_and_store_chunk({DataRoot, AbsoluteEndOffset, TXPath, TXRoot,
DataPath, FetchedPacking, ChunkEndOffset, ChunkSize, Chunk,
none, none, none}, State);
false ->
%% process unpacked chunkgst
%%
true = AbsoluteEndOffset == AbsoluteTXStartOffset + ChunkEndOffset,
pack_and_store_chunk({DataRoot, AbsoluteEndOffset, TXPath, TXRoot,
DataPath, Packing, ChunkEndOffset, ChunkSize, Chunk,
UnpackedChunk, none, none}, State)
DataPath, FetchedPacking, ChunkEndOffset, ChunkSize, Chunk,
FetchedChunk, none, none}, State)
end
end.

Expand Down
19 changes: 11 additions & 8 deletions apps/arweave/src/ar_http_iface_middleware.erl
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ handle(<<"GET">>, [<<"recent">>], Req, _Pid) ->
true ->
{200, #{}, ar_serialize:jsonify(ar_info:get_recent()), Req}
end;

handle(<<"GET">>, [<<"is_tx_blacklisted">>, EncodedTXID], Req, _Pid) ->
case ar_util:safe_decode(EncodedTXID) of
{error, invalid} ->
Expand Down Expand Up @@ -2008,21 +2008,24 @@ handle_get_chunk(OffsetBinary, Req, Encoding) ->
{Packing, ok};
{{true, _}, _StoreID} ->
{ok, Config} = application:get_env(arweave, config),
case lists:member(pack_served_chunks, Config#config.enable) of
false ->
{none, {reply, {404, #{}, <<>>, Req}}};
IsPackServedChunks = lists:member(pack_served_chunks, Config#config.enable),
Peer = ar_http_util:arweave_peer(Req),
IsLocalPeerAddr = lists:member(Peer, Config#config.local_peers),

case IsPackServedChunks orelse IsLocalPeerAddr of
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be andalso? I forget where we landed

Like I could see it working like this:

  • server sets pack_served_chunks, in which case it only repacks chunks for members of its local_peers
  • client sets request_packed_chunks and sync_from_local_peers_only in which case it requests packed chunks from its local_peers and does not validate it them before storing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We decided that we pack served chunks for local peers even when the option is not set.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should always require the option to be set. Unless you and Lev discussed otherwise?

i.e. there are a lot of scenarios where local_peers are set for other reasons (e.g. reduce rate limiting between CM cluster peers) and in those cases we probably don't want to also pack served chunks unless it's desired (e.g. we wouldn't want a CM Exit Node to get bogged down repacking accidentally)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed this few times:

  • CM exit node will unlikely have any data at all;
  • local_peers are considered trusted and therefore any requested packing is served.

Copy link
Collaborator

@JamesPiechota JamesPiechota Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I honestly don't remember wha we discussed. But I think we should require enable pack_served_chunks to be set. My understanding is that we use local_peers as a filter on top of that so that nodes don't end up packing for peers they don't want to. But we still need enable pack_served_chunks

The client can trust all data it receives from one of its local_peers though. That makes sense.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just chatted on slack. Current approach as implemented works! clients will only request packed chunks if they have fetch_packed_chunks set, but if they do request a packed chunk, the local_peer will repack.

true ->
ok = ar_semaphore:acquire(get_and_pack_chunk,
infinity),
{RequestedPacking, ok}
ok = ar_semaphore:acquire(get_and_pack_chunk, infinity),
{RequestedPacking, ok};
false ->
{none, {reply, {404, #{}, <<>>, Req}}}
end
end,
case CheckRecords of
{reply, Reply} ->
Reply;
ok ->
Args = #{ packing => ReadPacking,
bucket_based_offset => IsBucketBasedOffset },
bucket_based_offset => IsBucketBasedOffset, pack => true },
case ar_data_sync:get_chunk(Offset, Args) of
{ok, Proof} ->
Proof2 = maps:remove(unpacked_chunk,
Expand Down
Loading
Loading