diff --git a/apps/arweave/src/ar_http_iface_middleware.erl b/apps/arweave/src/ar_http_iface_middleware.erl index 7121202b1..41e4c56c7 100644 --- a/apps/arweave/src/ar_http_iface_middleware.erl +++ b/apps/arweave/src/ar_http_iface_middleware.erl @@ -1964,21 +1964,28 @@ handle_post_tx(Req, Peer, TX) -> end. handle_post_tx_accepted(Req, TX, Peer) -> - %% Exclude successful requests with valid transactions from the - %% IP-based throttling, to avoid connectivity issues at the times - %% of excessive transaction volumes. - {A, B, C, D, _} = Peer, - ar_blacklist_middleware:decrement_ip_addr({A, B, C, D}, Req), - BodyReadTime = ar_http_req:body_read_time(Req), - ar_peers:rate_gossiped_data(Peer, tx, - erlang:convert_time_unit(BodyReadTime, native, microsecond), - byte_size(term_to_binary(TX))), - ar_events:send(tx, {new, TX, {pushed, Peer}}), - TXID = TX#tx.id, - Ref = erlang:get(tx_id_ref), - ar_ignore_registry:remove_ref(TXID, Ref), - ar_ignore_registry:add_temporary(TXID, 10 * 60 * 1000), - ok. + %% Exclude successful requests with valid transactions from the + %% IP-based throttling, to avoid connectivity issues at the times + %% of excessive transaction volumes. + {A, B, C, D, _} = Peer, + ar_blacklist_middleware:decrement_ip_addr({A, B, C, D}, Req), + BodyReadTime = ar_http_req:body_read_time(Req), + ar_peers:rate_gossiped_data(Peer, tx, + erlang:convert_time_unit(BodyReadTime, native, microsecond), + byte_size(term_to_binary(TX))), + ar_events:send(tx, {new, TX, {pushed, Peer}}), + TXID = TX#tx.id, + Ref = erlang:get(tx_id_ref), + ar_ignore_registry:remove_ref(TXID, Ref), + ar_ignore_registry:add_temporary(TXID, 10 * 60 * 1000), + % Mark as processed with data if this is a format 2 tx with data + case TX#tx.format == 2 andalso byte_size(TX#tx.data) > 0 of + true -> + ar_ignore_registry:add_with_data(TXID); + false -> + ok + end, + ok. handle_post_tx_verification_response() -> {error_response, {400, #{}, <<"Transaction verification failed.">>}}. @@ -2994,15 +3001,24 @@ post_tx_parse_id(check_header, {Req, Pid, Encoding}) -> end end; post_tx_parse_id(check_ignore_list, {TXID, Req, Pid, Encoding}) -> - case ar_mempool:is_known_tx(TXID) of - true -> - {error, tx_already_processed, TXID, Req}; - false -> - Ref = make_ref(), - erlang:put(tx_id_ref, Ref), - ar_ignore_registry:add_ref(TXID, Ref, 5000), - post_tx_parse_id(read_body, {TXID, Req, Pid, Encoding}) - end; + case ar_mempool:is_known_tx(TXID) of + true -> + % Check if this is a format 2 tx with data that we haven't processed with data yet + case should_accept_tx_with_data(TXID, Req) of + true -> + Ref = make_ref(), + erlang:put(tx_id_ref, Ref), + ar_ignore_registry:add_ref(TXID, Ref, 5000), + post_tx_parse_id(read_body, {TXID, Req, Pid, Encoding}); + false -> + {error, tx_already_processed, TXID, Req} + end; + false -> + Ref = make_ref(), + erlang:put(tx_id_ref, Ref), + ar_ignore_registry:add_ref(TXID, Ref, 5000), + post_tx_parse_id(read_body, {TXID, Req, Pid, Encoding}) +end; post_tx_parse_id(read_body, {TXID, Req, Pid, Encoding}) -> case read_complete_body(Req, Pid) of {ok, Body, Req2} -> @@ -3017,6 +3033,25 @@ post_tx_parse_id(read_body, {TXID, Req, Pid, Encoding}) -> {error, timeout} -> {error, timeout} end; +post_tx_parse_id(check_ignore_list, {TXID, Req, Pid, Encoding}) -> + case ar_mempool:is_known_tx(TXID) of + true -> + % Check if this is a format 2 tx with data that we haven't processed with data yet + case should_accept_tx_with_data(TXID, Req) of + true -> + Ref = make_ref(), + erlang:put(tx_id_ref, Ref), + ar_ignore_registry:add_ref(TXID, Ref, 5000), + post_tx_parse_id(read_body, {TXID, Req, Pid, Encoding}); + false -> + {error, tx_already_processed, TXID, Req} + end; + false -> + Ref = make_ref(), + erlang:put(tx_id_ref, Ref), + ar_ignore_registry:add_ref(TXID, Ref, 5000), + post_tx_parse_id(read_body, {TXID, Req, Pid, Encoding}) + end; post_tx_parse_id(parse_json, {TXID, Req, Body}) -> Ref = erlang:get(tx_id_ref), case catch ar_serialize:json_struct_to_tx(Body) of @@ -3105,6 +3140,26 @@ post_tx_parse_id(verify_id_match, {MaybeTXID, Req, TX}) -> end end. +should_accept_tx_with_data(TXID, Req) -> + % Only accept if we haven't already processed this tx with data + case ar_ignore_registry:permanent_member_with_data(TXID) of + true -> + false; % Already processed with data + false -> + % Check if this request has a content-length indicating data + case cowboy_req:header(<<"content-length">>, Req, <<"0">>) of + <<"0">> -> + false; % No data in request + ContentLength -> + try + Size = binary_to_integer(ContentLength), + % Only accept if there's substantial data (> base tx size) + Size > 1000 % Rough estimate for tx with meaningful data + catch + _:_ -> false + end + end + end. handle_post_vdf(Req, Pid) -> Peer = ar_http_util:arweave_peer(Req), case ets:member(ar_peers, {vdf_server_peer, Peer}) of diff --git a/apps/arweave/src/ar_ignore_registry.erl b/apps/arweave/src/ar_ignore_registry.erl index f273e5add..ad3131cdf 100644 --- a/apps/arweave/src/ar_ignore_registry.erl +++ b/apps/arweave/src/ar_ignore_registry.erl @@ -69,3 +69,16 @@ member(ID) -> permanent_member(ID) -> Entries = ets:lookup(ignored_ids, ID), lists:member({ID, permanent}, Entries). + +%% @doc Put a permanent ID record into the registry with data flag. +add_with_data(ID) -> + ets:insert(ignored_ids, {ID, permanent_with_data}). + +%% @doc Check if there is a permanent record with data in the registry. +permanent_member_with_data(ID) -> + case ets:lookup(ignored_ids, ID) of + [{ID, permanent_with_data}] -> + true; + _ -> + false + end. \ No newline at end of file diff --git a/apps/arweave/test/ar_http_iface_tests.erl b/apps/arweave/test/ar_http_iface_tests.erl index 624aa559a..5174a6135 100644 --- a/apps/arweave/test/ar_http_iface_tests.erl +++ b/apps/arweave/test/ar_http_iface_tests.erl @@ -108,7 +108,8 @@ batch_test_() -> test_register( fun test_fallback_to_block_endpoint_if_cannot_send_tx/1, GenesisData), test_register(fun test_get_recent_hash_list_diff/1, GenesisData), - test_register(fun test_get_total_supply/1, GenesisData) + test_register(fun test_get_total_supply/1, GenesisData), + test_register(fun test_import_missing_data_for_existing_tx/1, GenesisData) ]} end }. @@ -1022,5 +1023,54 @@ wait_until_syncs_tx_data(TXID) -> 10000 ). +test_import_missing_data_for_existing_tx(_) -> + LocalHeight = ar_node:get_height(), + % Create a format 2 transaction with data + Data = <<"TEST DATA FOR IMPORT">>, + DataRoot = (ar_tx:generate_chunk_tree(#tx{ data = Data }))#tx.data_root, + TX = #tx{ id = TXID } = (ar_tx:new(Data))#tx{ + format = 2, + data_root = DataRoot, + data_size = byte_size(Data) + }, + + % First, post the transaction without data (simulating gossip) + TXWithoutData = TX#tx{ data = <<>> }, + ar_http_iface_client:send_tx_json(ar_test_node:peer_ip(main), TXWithoutData#tx.id, + ar_serialize:jsonify(ar_serialize:tx_to_json_struct(TXWithoutData))), + wait_until_receives_txs([TXWithoutData]), + + % Verify the transaction exists but data is missing + ?assertMatch( + {ok, {{<<"404">>, _}, _, _, _, _}}, + ar_http:req(#{ + method => get, + peer => ar_test_node:peer_ip(main), + path => "/tx/" ++ binary_to_list(ar_util:encode(TXID)) ++ "/data" + }) + ), + + % Now post the same transaction with data included + ?assertMatch( + {ok, {{<<"200">>, _}, _, <<"OK">>, _, _}}, + ar_http_iface_client:send_tx_json(ar_test_node:peer_ip(main), TX#tx.id, + ar_serialize:jsonify(ar_serialize:tx_to_json_struct(TX))) + ), + + % Mine the transaction + ar_test_node:mine(), + wait_until_height(main, LocalHeight + 1), + + % Verify the data is now available + {ok, RetrievedData} = wait_until_syncs_tx_data(TXID), + ?assertEqual(ar_util:encode(Data), RetrievedData), + + % Verify posting the same transaction again returns 208 + ?assertMatch( + {ok, {{<<"208">>, _}, _, <<"Transaction already processed.">>, _, _}}, + ar_http_iface_client:send_tx_json(ar_test_node:peer_ip(main), TX#tx.id, + ar_serialize:jsonify(ar_serialize:tx_to_json_struct(TX))) +). + height(Node) -> ar_test_node:remote_call(Node, ar_node, get_height, []).