Skip to content

Commit af87226

Browse files
authored
feat: async header response transform (#335)
1 parent 54ed75f commit af87226

File tree

13 files changed

+64
-56
lines changed

13 files changed

+64
-56
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/net/network/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ reth-metrics = { workspace = true, features = ["common"] }
6060
metrics.workspace = true
6161

6262
# misc
63+
async-trait.workspace = true
6364
auto_impl.workspace = true
6465
aquamarine.workspace = true
6566
tracing.workspace = true

crates/net/network/src/builder.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@ use crate::{
99
policy::NetworkPolicies,
1010
TransactionPropagationPolicy, TransactionsManager, TransactionsManagerConfig,
1111
},
12-
transform::header::HeaderTransform,
12+
transform::header::HeaderResponseTransform,
1313
NetworkHandle, NetworkManager,
1414
};
1515
use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives};
1616
use reth_network_api::test_utils::PeersHandleProvider;
1717
use reth_transaction_pool::TransactionPool;
18+
use std::sync::Arc;
1819
use tokio::sync::mpsc;
1920

2021
/// We set the max channel capacity of the `EthRequestHandler` to 256
@@ -64,7 +65,7 @@ impl<Tx, Eth, N: NetworkPrimitives> NetworkBuilder<Tx, Eth, N> {
6465
pub fn request_handler<Client>(
6566
self,
6667
client: Client,
67-
request_header_transform: Option<Box<dyn HeaderTransform<N::BlockHeader>>>,
68+
request_header_transform: Option<Arc<dyn HeaderResponseTransform<N::BlockHeader>>>,
6869
) -> NetworkBuilder<Tx, EthRequestHandler<Client, N>, N> {
6970
let Self { mut network, transactions, .. } = self;
7071
let (tx, rx) = mpsc::channel(ETH_REQUEST_CHANNEL_CAPACITY);

crates/net/network/src/eth_requests.rs

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22
33
use crate::{
44
budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget,
5-
metrics::EthRequestHandlerMetrics, transform::header::HeaderTransform,
5+
metrics::EthRequestHandlerMetrics, transform::header::HeaderResponseTransform,
66
};
77
use alloy_consensus::{BlockHeader, ReceiptWithBloom};
88
use alloy_eips::BlockHashOrNumber;
99
use alloy_rlp::Encodable;
10-
use futures::StreamExt;
10+
use futures::{future::join_all, StreamExt};
1111
use reth_eth_wire::{
1212
BlockBodies, BlockHeaders, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, GetNodeData,
1313
GetReceipts, HeadersDirection, NetworkPrimitives, NodeData, Receipts, Receipts69,
@@ -20,6 +20,7 @@ use reth_storage_api::{BlockReader, HeaderProvider};
2020
use std::{
2121
future::Future,
2222
pin::Pin,
23+
sync::Arc,
2324
task::{Context, Poll},
2425
time::Duration,
2526
};
@@ -54,15 +55,15 @@ pub const SOFT_RESPONSE_LIMIT: usize = 2 * 1024 * 1024;
5455
#[must_use = "Manager does nothing unless polled."]
5556
pub struct EthRequestHandler<C, N: NetworkPrimitives = EthNetworkPrimitives> {
5657
/// The client type that can interact with the chain.
57-
client: C,
58+
client: Arc<C>,
5859
/// Used for reporting peers.
5960
// TODO use to report spammers
6061
#[expect(dead_code)]
6162
peers: PeersHandle,
6263
/// Incoming request from the [`NetworkManager`](crate::NetworkManager).
6364
incoming_requests: ReceiverStream<IncomingEthRequest<N>>,
6465
/// The header transform to apply to the headers before sending to peers.
65-
header_transform: Option<Box<dyn HeaderTransform<N::BlockHeader>>>,
66+
header_transform: Option<Arc<dyn HeaderResponseTransform<N::BlockHeader>>>,
6667
/// Metrics for the eth request handler.
6768
metrics: EthRequestHandlerMetrics,
6869
}
@@ -74,10 +75,10 @@ impl<C, N: NetworkPrimitives> EthRequestHandler<C, N> {
7475
client: C,
7576
peers: PeersHandle,
7677
incoming: Receiver<IncomingEthRequest<N>>,
77-
header_transform: Option<Box<dyn HeaderTransform<N::BlockHeader>>>,
78+
header_transform: Option<Arc<dyn HeaderResponseTransform<N::BlockHeader>>>,
7879
) -> Self {
7980
Self {
80-
client,
81+
client: Arc::new(client),
8182
peers,
8283
incoming_requests: ReceiverStream::new(incoming),
8384
header_transform,
@@ -89,20 +90,22 @@ impl<C, N: NetworkPrimitives> EthRequestHandler<C, N> {
8990
impl<C, N> EthRequestHandler<C, N>
9091
where
9192
N: NetworkPrimitives,
92-
C: BlockReader<Header = N::BlockHeader>,
93+
C: BlockReader<Header = N::BlockHeader> + 'static,
9394
{
9495
/// Returns the list of requested headers
95-
fn get_headers_response(&self, request: GetBlockHeaders) -> Vec<C::Header> {
96+
async fn get_headers_response(
97+
client: Arc<C>,
98+
header_transform: Option<Arc<dyn HeaderResponseTransform<N::BlockHeader>>>,
99+
request: GetBlockHeaders,
100+
) -> Vec<C::Header> {
96101
let GetBlockHeaders { start_block, limit, skip, direction } = request;
97102

98103
let mut headers = Vec::new();
99104

100105
let mut block: BlockHashOrNumber = match start_block {
101106
BlockHashOrNumber::Hash(start) => start.into(),
102107
BlockHashOrNumber::Number(num) => {
103-
let Some(hash) = self.client.block_hash(num).unwrap_or_default() else {
104-
return headers
105-
};
108+
let Some(hash) = client.block_hash(num).unwrap_or_default() else { return headers };
106109
hash.into()
107110
}
108111
};
@@ -111,7 +114,7 @@ where
111114
let mut total_bytes = 0;
112115

113116
for _ in 0..limit {
114-
if let Some(header) = self.client.header_by_hash_or_number(block).unwrap_or_default() {
117+
if let Some(header) = client.header_by_hash_or_number(block).unwrap_or_default() {
115118
let number = header.number();
116119
let parent_hash = header.parent_hash();
117120

@@ -153,8 +156,8 @@ where
153156
}
154157

155158
// TODO: remove this once we deprecated l2geth
156-
if let Some(ref header_transform) = self.header_transform {
157-
headers = headers.into_iter().map(|h| header_transform.map(h)).collect()
159+
if let Some(ref header_transform) = header_transform {
160+
return join_all(headers.into_iter().map(|h| header_transform.map(h))).await;
158161
}
159162

160163
headers
@@ -165,10 +168,14 @@ where
165168
_peer_id: PeerId,
166169
request: GetBlockHeaders,
167170
response: oneshot::Sender<RequestResult<BlockHeaders<C::Header>>>,
168-
) {
171+
) -> impl Future<Output = ()> + 'static {
169172
self.metrics.eth_headers_requests_received_total.increment(1);
170-
let headers = self.get_headers_response(request);
171-
let _ = response.send(Ok(BlockHeaders(headers)));
173+
let client = self.client.clone();
174+
let header_transform = self.header_transform.clone();
175+
async move {
176+
let headers = Self::get_headers_response(client, header_transform, request).await;
177+
let _ = response.send(Ok(BlockHeaders(headers)));
178+
}
172179
}
173180

174181
fn on_bodies_request(
@@ -267,7 +274,8 @@ where
267274
N: NetworkPrimitives,
268275
C: BlockReader<Block = N::Block, Receipt = N::Receipt>
269276
+ HeaderProvider<Header = N::BlockHeader>
270-
+ Unpin,
277+
+ Unpin
278+
+ 'static,
271279
{
272280
type Output = ();
273281

@@ -284,7 +292,8 @@ where
284292
|incoming| {
285293
match incoming {
286294
IncomingEthRequest::GetBlockHeaders { peer_id, request, response } => {
287-
this.on_headers_request(peer_id, request, response)
295+
let future = this.on_headers_request(peer_id, request, response);
296+
tokio::spawn(future);
288297
}
289298
IncomingEthRequest::GetBlockBodies { peer_id, request, response } => {
290299
this.on_bodies_request(peer_id, request, response)

crates/net/network/src/transform/header.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,18 @@ impl<H: BlockHeader> HeaderTransform<H> for () {
1313
header
1414
}
1515
}
16+
17+
/// An instance of the trait applies a mapping to headers that are being sent to a peer in response
18+
/// to a request.
19+
#[async_trait::async_trait]
20+
pub trait HeaderResponseTransform<H: BlockHeader>: std::fmt::Debug + Send + Sync {
21+
/// Applies a mapping to the response headers.
22+
async fn map(&self, header: H) -> H;
23+
}
24+
25+
#[async_trait::async_trait]
26+
impl<H: BlockHeader> HeaderResponseTransform<H> for () {
27+
async fn map(&self, header: H) -> H {
28+
header
29+
}
30+
}

crates/node/builder/src/builder/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
1717
use reth_exex::ExExContext;
1818
use reth_network::{
1919
transactions::{TransactionPropagationPolicy, TransactionsManagerConfig},
20-
transform::header::HeaderTransform,
20+
transform::header::HeaderResponseTransform,
2121
NetworkBuilder, NetworkConfig, NetworkConfigBuilder, NetworkHandle, NetworkManager,
2222
NetworkPrimitives,
2323
};
@@ -586,10 +586,10 @@ where
586586
/// .extend_rpc_modules(|ctx| {
587587
/// // Access node components, so they can used by the CustomApi
588588
/// let pool = ctx.pool().clone();
589-
///
589+
///
590590
/// // Add custom RPC namespace
591591
/// ctx.modules.merge_configured(CustomApi { pool }.into_rpc())?;
592-
///
592+
///
593593
/// Ok(())
594594
/// })
595595
/// .build()?;
@@ -790,7 +790,7 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
790790
&self,
791791
builder: NetworkBuilder<(), (), N>,
792792
pool: Pool,
793-
request_transform: Option<Box<dyn HeaderTransform<N::BlockHeader>>>,
793+
request_transform: Option<Arc<dyn HeaderResponseTransform<N::BlockHeader>>>,
794794
) -> NetworkHandle<N>
795795
where
796796
N: NetworkPrimitives,
@@ -824,7 +824,7 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
824824
pool: Pool,
825825
tx_config: TransactionsManagerConfig,
826826
propagation_policy: Policy,
827-
request_transform: Option<Box<dyn HeaderTransform<N::BlockHeader>>>,
827+
request_transform: Option<Arc<dyn HeaderResponseTransform<N::BlockHeader>>>,
828828
) -> NetworkHandle<N>
829829
where
830830
N: NetworkPrimitives,

crates/optimism/consensus/src/proof.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ mod tests {
118118
];
119119

120120
for case in cases {
121-
let receipts = vec![
121+
let receipts = [
122122
// 0xb0d6ee650637911394396d81172bd1c637d568ed1fbddab0daddfca399c58b53
123123
OpReceipt::Deposit(OpDepositReceipt {
124124
inner: Receipt {

crates/payload/basic/src/lib.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -852,10 +852,12 @@ pub trait PayloadBuilder: Send + Sync + Clone {
852852
/// Tells the payload builder how to react to payload request if there's no payload available yet.
853853
///
854854
/// This situation can occur if the CL requests a payload before the first payload has been built.
855+
#[derive(Default)]
855856
pub enum MissingPayloadBehaviour<Payload> {
856857
/// Await the regular scheduled payload process.
857858
AwaitInProgress,
858859
/// Race the in progress payload process with an empty payload.
860+
#[default]
859861
RaceEmptyPayload,
860862
/// Race the in progress payload process with this job.
861863
RacePayload(Box<dyn FnOnce() -> Result<Payload, PayloadBuilderError> + Send>),
@@ -873,12 +875,6 @@ impl<Payload> fmt::Debug for MissingPayloadBehaviour<Payload> {
873875
}
874876
}
875877

876-
impl<Payload> Default for MissingPayloadBehaviour<Payload> {
877-
fn default() -> Self {
878-
Self::RaceEmptyPayload
879-
}
880-
}
881-
882878
/// Checks if the new payload is better than the current best.
883879
///
884880
/// This compares the total fees of the blocks, higher is better.

crates/prune/types/src/mode.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,17 @@ use alloy_primitives::BlockNumber;
88
#[cfg_attr(any(test, feature = "reth-codec"), reth_codecs::add_arbitrary_tests(compact))]
99
#[cfg_attr(any(test, feature = "serde"), derive(serde::Serialize, serde::Deserialize))]
1010
#[cfg_attr(any(test, feature = "serde"), serde(rename_all = "lowercase"))]
11+
#[cfg_attr(any(test, feature = "test-utils"), derive(Default))]
1112
pub enum PruneMode {
1213
/// Prune all blocks.
14+
#[cfg_attr(any(test, feature = "test-utils"), default)]
1315
Full,
1416
/// Prune blocks before the `head-N` block number. In other words, keep last N + 1 blocks.
1517
Distance(u64),
1618
/// Prune blocks before the specified block number. The specified block number is not pruned.
1719
Before(BlockNumber),
1820
}
1921

20-
#[cfg(any(test, feature = "test-utils"))]
21-
impl Default for PruneMode {
22-
fn default() -> Self {
23-
Self::Full
24-
}
25-
}
26-
2722
impl PruneMode {
2823
/// Prune blocks up to the specified block number. The specified block number is also pruned.
2924
///

crates/prune/types/src/segment.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ use thiserror::Error;
88
#[cfg_attr(any(test, feature = "reth-codec"), derive(reth_codecs::Compact))]
99
#[cfg_attr(any(test, feature = "reth-codec"), reth_codecs::add_arbitrary_tests(compact))]
1010
#[cfg_attr(any(test, feature = "serde"), derive(serde::Serialize, serde::Deserialize))]
11+
#[cfg_attr(test, derive(Default))]
1112
pub enum PruneSegment {
1213
/// Prune segment responsible for the `TransactionSenders` table.
14+
#[cfg_attr(test, default)]
1315
SenderRecovery,
1416
/// Prune segment responsible for the `TransactionHashNumbers` table.
1517
TransactionLookup,
@@ -72,10 +74,3 @@ pub enum PruneSegmentError {
7274
#[error("the configuration provided for {0} is invalid")]
7375
Configuration(PruneSegment),
7476
}
75-
76-
#[cfg(test)]
77-
impl Default for PruneSegment {
78-
fn default() -> Self {
79-
Self::SenderRecovery
80-
}
81-
}

0 commit comments

Comments
 (0)