Skip to content

Commit b34f6dc

Browse files
Merge pull request #20 from Irys-xyz/fix/p2p
Fix/p2p
2 parents d601a89 + aa5bceb commit b34f6dc

File tree

15 files changed

+246
-100
lines changed

15 files changed

+246
-100
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -609,8 +609,8 @@ tracy-client = "0.17.3"
609609
irys-primitives = { path = "../../crates/primitives" }
610610

611611
[patch.crates-io]
612-
revm = { path = "../.../revm/crates/revm" }
613-
revm-primitives = { path = "../.../revm/crates/primitives" }
612+
revm = { path = "../revm/crates/revm" }
613+
revm-primitives = { path = "../revm/crates/primitives" }
614614

615615
#alloy-consensus = { git = "https://github.com/alloy-rs/alloy", rev = "8c499409"}
616616
#alloy-eips = { git = "https://github.com/alloy-rs/alloy", rev = "8c499409"}

crates/e2e-test-utils/src/engine_api.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,19 @@ impl<E: EngineTypes> EngineApiTestContext<E> {
4141
Ok(self.engine_api_client.request("engine_getPayloadV1Irys", (payload_id,)).await?)
4242
}
4343

44+
pub async fn build_payload_v1_irys(
45+
&self,
46+
parent: B256,
47+
payload_attributes: E::PayloadAttributes,
48+
) -> eyre::Result<E::ExecutionPayloadV1Irys> {
49+
Ok(EngineApiClient::<E>::build_new_payload_irys(
50+
&self.engine_api_client,
51+
parent,
52+
payload_attributes,
53+
)
54+
.await?)
55+
}
56+
4457
/// Submits a payload to the engine api
4558
pub async fn submit_payload(
4659
&self,

crates/e2e-test-utils/src/node.rs

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,71 @@ where
129129
Ok((built_payload, eth_attr))
130130
}
131131

132+
pub async fn new_payload_irys(
133+
&mut self,
134+
// attributes: Engine::PayloadBuilderAttributes,
135+
attributes_generator: impl Fn(u64) -> Engine::PayloadBuilderAttributes,
136+
) -> eyre::Result<(Engine::BuiltPayload, Engine::PayloadBuilderAttributes)>
137+
where
138+
<Engine as EngineTypes>::ExecutionPayloadV1Irys:
139+
From<Engine::BuiltPayload> + PayloadEnvelopeExt,
140+
{
141+
// trigger new payload building draining the pool
142+
// self.payload.payload_builder.new_payload(attributes.clone()).await?;
143+
144+
// let eth_attr = attributes;
145+
146+
let eth_attr = self.payload.new_payload(attributes_generator).await.unwrap();
147+
148+
// first event is the payload attributes
149+
// self.payload.expect_attr_event(eth_attr.clone()).await?;
150+
// wait for the payload builder to have finished building
151+
let p2 = self.payload.wait_for_built_payload(eth_attr.payload_id()).await;
152+
// trigger resolve payload via engine api
153+
let _ = self.engine_api.get_payload_v1_irys(eth_attr.payload_id()).await?;
154+
// ensure we're also receiving the built payload as event
155+
// let built_payload = self.payload.expect_built_payload().await?;
156+
// self.
157+
// Ok((built_payload, eth_attr))
158+
Ok((/* execution_payload */ p2, eth_attr))
159+
}
160+
161+
pub async fn new_payload_irys2(
162+
&mut self,
163+
parent: B256,
164+
attributes: Engine::PayloadAttributes,
165+
) -> eyre::Result<(
166+
Engine::ExecutionPayloadV1Irys,
167+
Engine::BuiltPayload,
168+
Engine::PayloadBuilderAttributes,
169+
)>
170+
where
171+
<Engine as EngineTypes>::ExecutionPayloadV1Irys:
172+
From<Engine::BuiltPayload> + PayloadEnvelopeExt,
173+
{
174+
let payload_builder_attributes =
175+
<Engine as reth_node_builder::PayloadTypes>::PayloadBuilderAttributes::try_new(
176+
parent, attributes,
177+
)
178+
.expect("unable to build PayloadBuilderAttributes");
179+
use reth_payload_primitives::PayloadBuilder;
180+
// trigger new payload building draining the pool
181+
self.payload.payload_builder.new_payload(payload_builder_attributes.clone()).await?;
182+
183+
let eth_attr = payload_builder_attributes;
184+
// first event is the payload attributes
185+
// self.payload.expect_attr_event(eth_attr.clone()).await?;
186+
// wait for the payload builder to have finished building
187+
let built_payload = self.payload.wait_for_built_payload(eth_attr.payload_id()).await;
188+
// trigger resolve payload via engine api
189+
let execution_payload = self.engine_api.get_payload_v1_irys(eth_attr.payload_id()).await?;
190+
// ensure we're also receiving the built payload as event
191+
// let built_payload = self.payload.expect_built_payload().await?;
192+
// self.
193+
// Ok((built_payload, eth_attr))
194+
Ok((execution_payload, built_payload, eth_attr))
195+
}
196+
132197
/// Advances the node forward one block
133198
pub async fn advance_block(
134199
&mut self,
@@ -157,6 +222,34 @@ where
157222
Ok((payload, eth_attr))
158223
}
159224

225+
/// Advances the node forward one block
226+
pub async fn advance_block_irys(
227+
&mut self,
228+
versioned_hashes: Vec<B256>,
229+
attributes_generator: impl Fn(u64) -> Engine::PayloadBuilderAttributes,
230+
) -> eyre::Result<(Engine::BuiltPayload, Engine::PayloadBuilderAttributes)>
231+
where
232+
<Engine as EngineTypes>::ExecutionPayloadV1Irys:
233+
From<Engine::BuiltPayload> + PayloadEnvelopeExt,
234+
{
235+
let (payload, eth_attr) = self.new_payload_irys(attributes_generator).await?;
236+
237+
let block_hash = self
238+
.engine_api
239+
.submit_payload(
240+
payload.clone(),
241+
eth_attr.clone(),
242+
PayloadStatusEnum::Valid,
243+
versioned_hashes,
244+
)
245+
.await?;
246+
247+
// trigger forkchoice update via engine api to commit the block to the blockchain
248+
self.engine_api.update_forkchoice(block_hash, block_hash).await?;
249+
250+
Ok((payload, eth_attr))
251+
}
252+
160253
/// Waits for block to be available on node.
161254
pub async fn wait_block(
162255
&self,
@@ -221,6 +314,37 @@ where
221314
let tx = head.tip().transactions().next();
222315
assert_eq!(tx.unwrap().hash().as_slice(), tip_tx_hash.as_slice());
223316

317+
loop {
318+
// wait for the block to commit
319+
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
320+
if let Some(latest_block) =
321+
self.inner.provider.block_by_number_or_tag(BlockNumberOrTag::Latest)?
322+
{
323+
if latest_block.number == block_number {
324+
// make sure the block hash we submitted via FCU engine api is the new latest
325+
// block using an RPC call
326+
assert_eq!(latest_block.hash_slow(), block_hash);
327+
break;
328+
}
329+
}
330+
}
331+
Ok(())
332+
}
333+
/// Asserts that a new block has been added to the blockchain
334+
/// and the tx has been included in the block.
335+
///
336+
/// Does NOT work for pipeline since there's no stream notification!
337+
pub async fn assert_new_block2(
338+
&self,
339+
block_hash: B256,
340+
block_number: BlockNumber,
341+
) -> eyre::Result<()> {
342+
// get head block from notifications stream and verify the tx has been pushed to the
343+
// pool is actually present in the canonical block
344+
// let head = self.engine_api.canonical_stream.next().await.unwrap();
345+
// let tx = head.tip().transactions().next();
346+
// assert_eq!(tx.unwrap().hash().as_slice(), tip_tx_hash.as_slice());
347+
224348
loop {
225349
// wait for the block to commit
226350
tokio::time::sleep(std::time::Duration::from_millis(20)).await;

crates/e2e-test-utils/src/payload.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,10 @@ impl<E: EngineTypes> PayloadTestContext<E> {
4848
}
4949

5050
/// Wait until the best built payload is ready
51-
pub async fn wait_for_built_payload(&self, payload_id: PayloadId) {
51+
pub async fn wait_for_built_payload(
52+
&self,
53+
payload_id: PayloadId,
54+
) -> <E as reth_node_builder::PayloadTypes>::BuiltPayload {
5255
loop {
5356
// let payload = self.payload_builder.best_payload(payload_id).await.unwrap().unwrap();
5457
match self.payload_builder.best_payload(payload_id).await {
@@ -59,7 +62,7 @@ impl<E: EngineTypes> PayloadTestContext<E> {
5962
continue;
6063
} else {
6164
trace!("got payload");
62-
break;
65+
return v;
6366
}
6467
}
6568
Err(e) => {

crates/ethereum/evm/src/execute.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -166,19 +166,20 @@ where
166166
// Default::default(),
167167
// ))
168168
// .build();
169-
let prev = evm.context.evm.inner.journaled_state.checkpoint();
170-
// TODO: fix this
169+
170+
// let prev = evm.context.evm.inner.journaled_state.checkpoint();
171+
171172
let shadow_exec =
172173
apply_block_shadows(block.body.shadows.as_ref(), &mut evm).map_err(move |err| {
173174
let new_err = err.map_db_err(|e| e.into());
174175
BlockValidationError::EVM { hash: B256::ZERO, error: Box::new(new_err) }
175176
})?;
176-
info!("shadow exec: {:#?}", &shadow_exec);
177+
info!("shadow exec2: {:#?}", &shadow_exec);
177178
let ss = evm.context.evm.inner.journaled_state.state.clone();
178179

179180
evm.db_mut().commit(ss);
180181

181-
evm.context.evm.inner.journaled_state.checkpoint_revert(prev);
182+
// evm.context.evm.inner.journaled_state.checkpoint_revert(prev);
182183

183184
// execute transactions
184185
let mut cumulative_gas_used = 0;

crates/net/eth-wire-types/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ workspace = true
1616
reth-chainspec.workspace = true
1717
reth-codecs-derive.workspace = true
1818
reth-primitives.workspace = true
19-
2019
# ethereum
2120
alloy-chains = { workspace = true, features = ["rlp"] }
2221
alloy-eips.workspace = true

crates/net/eth-wire-types/src/message.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ use super::{
1212
NewPooledTransactionHashes68, NodeData, PooledTransactions, Receipts, Status, Transactions,
1313
};
1414
use crate::{EthVersion, SharedTransactions};
15-
1615
use alloy_primitives::bytes::{Buf, BufMut};
1716
use alloy_rlp::{length_of_length, Decodable, Encodable, Header};
1817
use std::{fmt::Debug, sync::Arc};
@@ -46,7 +45,6 @@ impl ProtocolMessage {
4645
/// Create a new `ProtocolMessage` from a message type and message rlp bytes.
4746
pub fn decode_message(version: EthVersion, buf: &mut &[u8]) -> Result<Self, MessageError> {
4847
let message_type = EthMessageID::decode(buf)?;
49-
5048
let message = match message_type {
5149
EthMessageID::Status => EthMessage::Status(Status::decode(buf)?),
5250
EthMessageID::NewBlockHashes => {
@@ -91,14 +89,14 @@ impl ProtocolMessage {
9189
}
9290
EthMessageID::GetNodeData => {
9391
if version >= EthVersion::Eth67 {
94-
return Err(MessageError::Invalid(version, EthMessageID::GetNodeData))
92+
return Err(MessageError::Invalid(version, EthMessageID::GetNodeData));
9593
}
9694
let request_pair = RequestPair::<GetNodeData>::decode(buf)?;
9795
EthMessage::GetNodeData(request_pair)
9896
}
9997
EthMessageID::NodeData => {
10098
if version >= EthVersion::Eth67 {
101-
return Err(MessageError::Invalid(version, EthMessageID::GetNodeData))
99+
return Err(MessageError::Invalid(version, EthMessageID::GetNodeData));
102100
}
103101
let request_pair = RequestPair::<NodeData>::decode(buf)?;
104102
EthMessage::NodeData(request_pair)
@@ -219,7 +217,7 @@ pub enum EthMessage {
219217

220218
impl EthMessage {
221219
/// Returns the message's ID.
222-
pub const fn message_id(&self) -> EthMessageID {
220+
pub fn message_id(&self) -> EthMessageID {
223221
match self {
224222
Self::Status(_) => EthMessageID::Status,
225223
Self::NewBlockHashes(_) => EthMessageID::NewBlockHashes,
@@ -483,7 +481,7 @@ where
483481
// RequestPair
484482
let consumed_len = initial_length - buf.len();
485483
if consumed_len != header.payload_length {
486-
return Err(alloy_rlp::Error::UnexpectedLength)
484+
return Err(alloy_rlp::Error::UnexpectedLength);
487485
}
488486

489487
Ok(Self { request_id, message })

crates/net/eth-wire/src/p2pstream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -620,10 +620,10 @@ where
620620
Poll::Ready(Err(err)) => break Poll::Ready(Err(err.into())),
621621
Poll::Ready(Ok(())) => {
622622
let Some(message) = this.outgoing_messages.pop_front() else {
623-
break Poll::Ready(Ok(()))
623+
break Poll::Ready(Ok(()));
624624
};
625625
if let Err(err) = this.inner.as_mut().start_send(message) {
626-
break Poll::Ready(Err(err.into()))
626+
break Poll::Ready(Err(err.into()));
627627
}
628628
}
629629
}

crates/net/network/src/eth_requests.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use reth_primitives::{BlockBody, Header};
2121
use reth_storage_api::{BlockReader, HeaderProvider, ReceiptProvider};
2222
use tokio::sync::{mpsc::Receiver, oneshot};
2323
use tokio_stream::wrappers::ReceiverStream;
24+
use tracing::{debug, warn};
2425

2526
use crate::{
2627
budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget,
@@ -133,7 +134,7 @@ where
133134
headers.push(header);
134135

135136
if headers.len() >= MAX_HEADERS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
136-
break
137+
break;
137138
}
138139
} else {
139140
break;
@@ -164,18 +165,19 @@ where
164165
let mut bodies = Vec::new();
165166

166167
let mut total_bytes = 0;
168+
debug!("Getting block bodies {:?}", &request.0);
167169

168170
for hash in request.0 {
169171
if let Some(block) = self.client.block_by_hash(hash).unwrap_or_default() {
170172
let body: BlockBody = block.into();
171-
172173
total_bytes += body.length();
173174
bodies.push(body);
174175

175176
if bodies.len() >= MAX_BODIES_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
176-
break
177+
break;
177178
}
178179
} else {
180+
warn!("Unable to get block body {}", &hash);
179181
break;
180182
}
181183
}
@@ -208,7 +210,7 @@ where
208210
receipts.push(receipt);
209211

210212
if receipts.len() >= MAX_RECEIPTS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
211-
break
213+
break;
212214
}
213215
} else {
214216
break;

0 commit comments

Comments
 (0)