Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 79 additions & 24 deletions apps/arweave/src/ar_http_iface_middleware.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1964,21 +1964,28 @@ handle_post_tx(Req, Peer, TX) ->
end.

handle_post_tx_accepted(Req, TX, Peer) ->
%% Exclude successful requests with valid transactions from the
%% IP-based throttling, to avoid connectivity issues at the times
%% of excessive transaction volumes.
{A, B, C, D, _} = Peer,
ar_blacklist_middleware:decrement_ip_addr({A, B, C, D}, Req),
BodyReadTime = ar_http_req:body_read_time(Req),
ar_peers:rate_gossiped_data(Peer, tx,
erlang:convert_time_unit(BodyReadTime, native, microsecond),
byte_size(term_to_binary(TX))),
ar_events:send(tx, {new, TX, {pushed, Peer}}),
TXID = TX#tx.id,
Ref = erlang:get(tx_id_ref),
ar_ignore_registry:remove_ref(TXID, Ref),
ar_ignore_registry:add_temporary(TXID, 10 * 60 * 1000),
ok.
%% Exclude successful requests with valid transactions from the
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small thing: arweave codebase uses tabs for indenting

%% IP-based throttling, to avoid connectivity issues at the times
%% of excessive transaction volumes.
{A, B, C, D, _} = Peer,
ar_blacklist_middleware:decrement_ip_addr({A, B, C, D}, Req),
BodyReadTime = ar_http_req:body_read_time(Req),
ar_peers:rate_gossiped_data(Peer, tx,
erlang:convert_time_unit(BodyReadTime, native, microsecond),
byte_size(term_to_binary(TX))),
ar_events:send(tx, {new, TX, {pushed, Peer}}),
TXID = TX#tx.id,
Ref = erlang:get(tx_id_ref),
ar_ignore_registry:remove_ref(TXID, Ref),
ar_ignore_registry:add_temporary(TXID, 10 * 60 * 1000),
% Mark as processed with data if this is a format 2 tx with data
case TX#tx.format == 2 andalso byte_size(TX#tx.data) > 0 of
true ->
ar_ignore_registry:add_with_data(TXID);
false ->
ok
end,
ok.

handle_post_tx_verification_response() ->
{error_response, {400, #{}, <<"Transaction verification failed.">>}}.
Expand Down Expand Up @@ -2994,15 +3001,24 @@ post_tx_parse_id(check_header, {Req, Pid, Encoding}) ->
end
end;
post_tx_parse_id(check_ignore_list, {TXID, Req, Pid, Encoding}) ->
case ar_mempool:is_known_tx(TXID) of
true ->
{error, tx_already_processed, TXID, Req};
false ->
Ref = make_ref(),
erlang:put(tx_id_ref, Ref),
ar_ignore_registry:add_ref(TXID, Ref, 5000),
post_tx_parse_id(read_body, {TXID, Req, Pid, Encoding})
end;
case ar_mempool:is_known_tx(TXID) of
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This breaks the mechanism where we process every tx only once and thus makes the network amplify spam.

We would need to do something like a two-tier ignore registry where a tx with data is accepted if it is not yet recorded in the second tier.

true ->
% Check if this is a format 2 tx with data that we haven't processed with data yet
case should_accept_tx_with_data(TXID, Req) of
true ->
Ref = make_ref(),
erlang:put(tx_id_ref, Ref),
ar_ignore_registry:add_ref(TXID, Ref, 5000),
post_tx_parse_id(read_body, {TXID, Req, Pid, Encoding});
false ->
{error, tx_already_processed, TXID, Req}
end;
false ->
Ref = make_ref(),
erlang:put(tx_id_ref, Ref),
ar_ignore_registry:add_ref(TXID, Ref, 5000),
post_tx_parse_id(read_body, {TXID, Req, Pid, Encoding})
end;
post_tx_parse_id(read_body, {TXID, Req, Pid, Encoding}) ->
case read_complete_body(Req, Pid) of
{ok, Body, Req2} ->
Expand All @@ -3017,6 +3033,25 @@ post_tx_parse_id(read_body, {TXID, Req, Pid, Encoding}) ->
{error, timeout} ->
{error, timeout}
end;
post_tx_parse_id(check_ignore_list, {TXID, Req, Pid, Encoding}) ->
case ar_mempool:is_known_tx(TXID) of
true ->
% Check if this is a format 2 tx with data that we haven't processed with data yet
case should_accept_tx_with_data(TXID, Req) of
true ->
Ref = make_ref(),
erlang:put(tx_id_ref, Ref),
ar_ignore_registry:add_ref(TXID, Ref, 5000),
post_tx_parse_id(read_body, {TXID, Req, Pid, Encoding});
false ->
{error, tx_already_processed, TXID, Req}
end;
false ->
Ref = make_ref(),
erlang:put(tx_id_ref, Ref),
ar_ignore_registry:add_ref(TXID, Ref, 5000),
post_tx_parse_id(read_body, {TXID, Req, Pid, Encoding})
end;
post_tx_parse_id(parse_json, {TXID, Req, Body}) ->
Ref = erlang:get(tx_id_ref),
case catch ar_serialize:json_struct_to_tx(Body) of
Expand Down Expand Up @@ -3105,6 +3140,26 @@ post_tx_parse_id(verify_id_match, {MaybeTXID, Req, TX}) ->
end
end.

should_accept_tx_with_data(TXID, Req) ->
% Only accept if we haven't already processed this tx with data
case ar_ignore_registry:permanent_member_with_data(TXID) of
true ->
false; % Already processed with data
false ->
% Check if this request has a content-length indicating data
case cowboy_req:header(<<"content-length">>, Req, <<"0">>) of
<<"0">> ->
false; % No data in request
ContentLength ->
try
Size = binary_to_integer(ContentLength),
% Only accept if there's substantial data (> base tx size)
Size > 1000 % Rough estimate for tx with meaningful data
catch
_:_ -> false
end
end
end.
handle_post_vdf(Req, Pid) ->
Peer = ar_http_util:arweave_peer(Req),
case ets:member(ar_peers, {vdf_server_peer, Peer}) of
Expand Down
13 changes: 13 additions & 0 deletions apps/arweave/src/ar_ignore_registry.erl
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,16 @@ member(ID) ->
permanent_member(ID) ->
Entries = ets:lookup(ignored_ids, ID),
lists:member({ID, permanent}, Entries).

%% @doc Put a permanent ID record into the registry with data flag.
add_with_data(ID) ->
ets:insert(ignored_ids, {ID, permanent_with_data}).

%% @doc Check if there is a permanent record with data in the registry.
permanent_member_with_data(ID) ->
case ets:lookup(ignored_ids, ID) of
[{ID, permanent_with_data}] ->
true;
_ ->
false
end.
52 changes: 51 additions & 1 deletion apps/arweave/test/ar_http_iface_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ batch_test_() ->
test_register(
fun test_fallback_to_block_endpoint_if_cannot_send_tx/1, GenesisData),
test_register(fun test_get_recent_hash_list_diff/1, GenesisData),
test_register(fun test_get_total_supply/1, GenesisData)
test_register(fun test_get_total_supply/1, GenesisData),
test_register(fun test_import_missing_data_for_existing_tx/1, GenesisData)
]}
end
}.
Expand Down Expand Up @@ -1022,5 +1023,54 @@ wait_until_syncs_tx_data(TXID) ->
10000
).

test_import_missing_data_for_existing_tx(_) ->
LocalHeight = ar_node:get_height(),
% Create a format 2 transaction with data
Data = <<"TEST DATA FOR IMPORT">>,
DataRoot = (ar_tx:generate_chunk_tree(#tx{ data = Data }))#tx.data_root,
TX = #tx{ id = TXID } = (ar_tx:new(Data))#tx{
format = 2,
data_root = DataRoot,
data_size = byte_size(Data)
},

% First, post the transaction without data (simulating gossip)
TXWithoutData = TX#tx{ data = <<>> },
ar_http_iface_client:send_tx_json(ar_test_node:peer_ip(main), TXWithoutData#tx.id,
ar_serialize:jsonify(ar_serialize:tx_to_json_struct(TXWithoutData))),
wait_until_receives_txs([TXWithoutData]),

% Verify the transaction exists but data is missing
?assertMatch(
{ok, {{<<"404">>, _}, _, _, _, _}},
ar_http:req(#{
method => get,
peer => ar_test_node:peer_ip(main),
path => "/tx/" ++ binary_to_list(ar_util:encode(TXID)) ++ "/data"
})
),

% Now post the same transaction with data included
?assertMatch(
{ok, {{<<"200">>, _}, _, <<"OK">>, _, _}},
ar_http_iface_client:send_tx_json(ar_test_node:peer_ip(main), TX#tx.id,
ar_serialize:jsonify(ar_serialize:tx_to_json_struct(TX)))
),

% Mine the transaction
ar_test_node:mine(),
wait_until_height(main, LocalHeight + 1),

% Verify the data is now available
{ok, RetrievedData} = wait_until_syncs_tx_data(TXID),
?assertEqual(ar_util:encode(Data), RetrievedData),

% Verify posting the same transaction again returns 208
?assertMatch(
{ok, {{<<"208">>, _}, _, <<"Transaction already processed.">>, _, _}},
ar_http_iface_client:send_tx_json(ar_test_node:peer_ip(main), TX#tx.id,
ar_serialize:jsonify(ar_serialize:tx_to_json_struct(TX)))
).

height(Node) ->
ar_test_node:remote_call(Node, ar_node, get_height, []).