Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ jobs:
needs: build
runs-on: self-hosted
strategy:
fail-fast: true
fail-fast: false
max-parallel: 12
matrix:
core_test_mod: [
## Long-running tests. Put these first to limit the overall runtime of the
## Long-running tests. Put these first to limit the overall runtime of the
## test suite
ar_coordinated_mining_tests,
ar_data_sync_tests,
Expand Down
4 changes: 3 additions & 1 deletion apps/arweave/include/ar_data_sync.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -211,5 +211,7 @@
%% fragmentation.
store_chunk_queue_threshold = ?STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD,
%% Cache mapping peers to /data_sync_record responses
all_peers_intervals = #{}
all_peers_intervals = #{},
%% List of local peers used to check if we need to skip block verification.
local_peers = []
}).
40 changes: 29 additions & 11 deletions apps/arweave/src/ar_data_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,8 @@ init({"default" = StoreID, _}) ->
weave_size = maps:get(weave_size, StateMap),
disk_pool_cursor = first,
disk_pool_threshold = DiskPoolThreshold,
store_id = StoreID
store_id = StoreID,
local_peers = Config#config.local_peers
},
timer:apply_interval(?REMOVE_EXPIRED_DATA_ROOTS_FREQUENCY_MS, ?MODULE,
remove_expired_disk_pool_data_roots, []),
Expand Down Expand Up @@ -691,18 +692,20 @@ init({StoreID, RepackInPlacePacking}) ->
process_flag(trap_exit, true),
[ok, ok] = ar_events:subscribe([node_state, disksup]),
State = init_kv(StoreID),
{ok, Config} = application:get_env(arweave, config),
State2 = State#sync_data_state{local_peers = Config#config.local_peers},
case RepackInPlacePacking of
none ->
gen_server:cast(self(), process_store_chunk_queue),
{RangeStart, RangeEnd} = ar_storage_module:get_range(StoreID),
State2 = State#sync_data_state{
State3 = State2#sync_data_state{
store_id = StoreID,
range_start = RangeStart,
range_end = RangeEnd
},
{ok, may_be_start_syncing(State2)};
{ok, may_be_start_syncing(State3)};
_ ->
{ok, State}
{ok, State2}
end.

handle_cast({move_data_root_index, Cursor, N}, State) ->
Expand Down Expand Up @@ -1087,6 +1090,7 @@ handle_cast({store_fetched_chunk, Peer, Byte, Proof} = Cast, State) ->
Offset = SeekByte - BlockStartOffset,
ValidateDataPathRuleset = ar_poa:get_data_path_validation_ruleset(BlockStartOffset,
get_merkle_rebase_threshold()),
IsLocalPeer = lists:member(Peer, State#sync_data_state.local_peers),
case validate_proof(TXRoot, BlockStartOffset, Offset, BlockSize, Proof,
ValidateDataPathRuleset) of
{need_unpacking, AbsoluteOffset, ChunkArgs, VArgs} ->
Expand All @@ -1098,6 +1102,8 @@ handle_cast({store_fetched_chunk, Peer, Byte, Proof} = Cast, State) ->
true ->
decrement_chunk_cache_size(),
{noreply, State};
false when IsLocalPeer ->
process_valid_fetched_chunk(ChunkArgs, Args, State);
false ->
case ar_packing_server:is_buffer_full() of
true ->
Expand All @@ -1115,8 +1121,9 @@ handle_cast({store_fetched_chunk, Peer, Byte, Proof} = Cast, State) ->
{AbsoluteOffset, unpacked}}),
{noreply, State#sync_data_state{
packing_map = PackingMap#{
{AbsoluteOffset, unpacked} => {unpack_fetched_chunk,
Args} } }}
{AbsoluteOffset, unpacked} => {unpack_fetched_chunk, Args}
}
}}
end
end;
false ->
Expand Down Expand Up @@ -1877,8 +1884,11 @@ get_tx_offset_data_in_range2(TXOffsetIndex, TXIndex, Start, End) ->
get_tx_data(Start, End, Chunks) when Start >= End ->
{ok, iolist_to_binary(Chunks)};
get_tx_data(Start, End, Chunks) ->
case get_chunk(Start + 1, #{ pack => true, packing => unpacked,
bucket_based_offset => false }) of
case get_chunk(Start + 1, #{
pack => true,
packing => unpacked,
bucket_based_offset => false
}) of
{ok, #{ chunk := Chunk }} ->
get_tx_data(Start + byte_size(Chunk), End, [Chunks | Chunk]);
{error, chunk_not_found} ->
Expand Down Expand Up @@ -2829,7 +2839,7 @@ process_invalid_fetched_chunk(Peer, Byte, State) ->

process_valid_fetched_chunk(ChunkArgs, Args, State) ->
#sync_data_state{ store_id = StoreID } = State,
{Packing, UnpackedChunk, AbsoluteEndOffset, TXRoot, ChunkSize} = ChunkArgs,
{FetchedPacking, FetchedChunk, AbsoluteEndOffset, TXRoot, ChunkSize} = ChunkArgs,
{AbsoluteTXStartOffset, TXSize, DataPath, TXPath, DataRoot, Chunk, _ChunkID,
ChunkEndOffset, Peer, Byte} = Args,
case is_chunk_proof_ratio_attractive(ChunkSize, TXSize, DataPath) of
Expand All @@ -2845,11 +2855,19 @@ process_valid_fetched_chunk(ChunkArgs, Args, State) ->
%% The chunk has been synced by another job already.
decrement_chunk_cache_size(),
{noreply, State};
false when FetchedPacking =/= unpacked ->
%% we don't have unpacked chunk, so possible repack is needed
true = AbsoluteEndOffset == AbsoluteTXStartOffset + ChunkEndOffset,
pack_and_store_chunk({DataRoot, AbsoluteEndOffset, TXPath, TXRoot,
DataPath, FetchedPacking, ChunkEndOffset, ChunkSize, Chunk,
none, none, none}, State);
false ->
%% process unpacked chunkgst
%%
true = AbsoluteEndOffset == AbsoluteTXStartOffset + ChunkEndOffset,
pack_and_store_chunk({DataRoot, AbsoluteEndOffset, TXPath, TXRoot,
DataPath, Packing, ChunkEndOffset, ChunkSize, Chunk,
UnpackedChunk, none, none}, State)
DataPath, FetchedPacking, ChunkEndOffset, ChunkSize, Chunk,
FetchedChunk, none, none}, State)
end
end.

Expand Down
19 changes: 11 additions & 8 deletions apps/arweave/src/ar_http_iface_middleware.erl
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ handle(<<"GET">>, [<<"recent">>], Req, _Pid) ->
true ->
{200, #{}, ar_serialize:jsonify(ar_info:get_recent()), Req}
end;

handle(<<"GET">>, [<<"is_tx_blacklisted">>, EncodedTXID], Req, _Pid) ->
case ar_util:safe_decode(EncodedTXID) of
{error, invalid} ->
Expand Down Expand Up @@ -2008,21 +2008,24 @@ handle_get_chunk(OffsetBinary, Req, Encoding) ->
{Packing, ok};
{{true, _}, _StoreID} ->
{ok, Config} = application:get_env(arweave, config),
case lists:member(pack_served_chunks, Config#config.enable) of
false ->
{none, {reply, {404, #{}, <<>>, Req}}};
IsPackServedChunks = lists:member(pack_served_chunks, Config#config.enable),
Peer = ar_http_util:arweave_peer(Req),
IsLocalPeerAddr = lists:member(Peer, Config#config.local_peers),

case IsPackServedChunks orelse IsLocalPeerAddr of
true ->
ok = ar_semaphore:acquire(get_and_pack_chunk,
infinity),
{RequestedPacking, ok}
ok = ar_semaphore:acquire(get_and_pack_chunk, infinity),
{RequestedPacking, ok};
false ->
{none, {reply, {404, #{}, <<>>, Req}}}
end
end,
case CheckRecords of
{reply, Reply} ->
Reply;
ok ->
Args = #{ packing => ReadPacking,
bucket_based_offset => IsBucketBasedOffset },
bucket_based_offset => IsBucketBasedOffset, pack => true },
case ar_data_sync:get_chunk(Offset, Args) of
{ok, Proof} ->
Proof2 = maps:remove(unpacked_chunk,
Expand Down
5 changes: 3 additions & 2 deletions apps/arweave/src/ar_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,11 @@ register() ->
]),
prometheus_gauge:new([
{name, v2_index_data_size_by_packing},
{labels, [store_id, packing, partition_number, storage_module_size, storage_module_index]},
{labels, [store_id, packing, partition_number, storage_module_size, storage_module_index,
packing_difficulty]},
{help, "The size (in bytes) of the data stored and indexed. Grouped by the "
"store ID, packing, partition number, storage module size, "
"and storage module index."}
"storage module index, and packing difficulty."}
]),

%% Disk pool.
Expand Down
Loading
Loading