Skip to content

Commit eff4460

Browse files
committed
Add logic
1 parent 6b41f8c commit eff4460

File tree

3 files changed

+45
-4
lines changed

3 files changed

+45
-4
lines changed

crates/op-rbuilder/src/builders/flashblocks/payload.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use reth_node_api::{Block, NodePrimitives, PayloadBuilderError};
2424
use reth_optimism_consensus::{calculate_receipt_root_no_memo_optimism, isthmus};
2525
use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes};
2626
use reth_optimism_forks::OpHardforks;
27-
use reth_optimism_node::{OpBuiltPayload, OpPayloadBuilderAttributes};
27+
use reth_optimism_node::{OpBuiltPayload, OpEngineTypes, OpPayloadBuilderAttributes};
2828
use reth_optimism_primitives::{OpPrimitives, OpReceipt, OpTransactionSigned};
2929
use reth_payload_util::BestPayloadTransactions;
3030
use reth_primitives_traits::RecoveredBlock;
@@ -48,6 +48,8 @@ use std::{
4848
},
4949
time::Instant,
5050
};
51+
use std::sync::Mutex;
52+
use reth_payload_builder_primitives::Events;
5153
use tokio::sync::{
5254
mpsc,
5355
mpsc::{error::SendError, Sender},
@@ -119,6 +121,8 @@ pub struct OpPayloadBuilder<Pool, Client, BT> {
119121
/// The end of builder transaction type
120122
#[allow(dead_code)]
121123
pub builder_tx: BT,
124+
/// Builder events handle to send BuiltPayload events
125+
payload_builder_handle: Arc<Mutex<Option<tokio::sync::broadcast::Sender<Events<OpEngineTypes>>>>>
122126
}
123127

124128
impl<Pool, Client, BT> OpPayloadBuilder<Pool, Client, BT> {
@@ -129,6 +133,7 @@ impl<Pool, Client, BT> OpPayloadBuilder<Pool, Client, BT> {
129133
client: Client,
130134
config: BuilderConfig<FlashblocksConfig>,
131135
builder_tx: BT,
136+
payload_builder_handle: Arc<Mutex<Option<tokio::sync::broadcast::Sender<Events<OpEngineTypes>>>>>
132137
) -> eyre::Result<Self> {
133138
let metrics = Arc::new(OpRBuilderMetrics::default());
134139
let ws_pub = WebSocketPublisher::new(config.specific.ws_addr, Arc::clone(&metrics))?.into();
@@ -140,6 +145,7 @@ impl<Pool, Client, BT> OpPayloadBuilder<Pool, Client, BT> {
140145
config,
141146
metrics,
142147
builder_tx,
148+
payload_builder_handle,
143149
})
144150
}
145151
}
@@ -290,6 +296,8 @@ where
290296
let (payload, fb_payload) = build_block(&mut state, &ctx, &mut info)?;
291297

292298
best_payload.set(payload.clone());
299+
self.send_payload_to_engine(payload);
300+
293301
self.ws_pub
294302
.publish(&fb_payload)
295303
.map_err(PayloadBuilderError::other)?;
@@ -536,6 +544,7 @@ where
536544
.record(info.executed_transactions.len() as f64);
537545

538546
best_payload.set(new_payload.clone());
547+
self.send_payload_to_engine(new_payload);
539548
// Update bundle_state for next iteration
540549
total_gas_per_batch += gas_per_batch;
541550
if let Some(da_limit) = da_per_batch {
@@ -605,6 +614,27 @@ where
605614
span.record("flashblock_count", ctx.flashblock_index());
606615
}
607616

617+
/// Sends built payload via payload builder handle broadcast channel to the engine
618+
pub fn send_payload_to_engine(&self, payload: OpBuiltPayload) {
619+
// Send built payload as create one
620+
match self.payload_builder_handle.lock().as_deref() {
621+
Ok(Some(handle)) => {
622+
let res = handle.send(Events::BuiltPayload(payload.clone().into()));
623+
if let Err(e) = res {
624+
error!(
625+
message = "Failed to send payload via payload builder handle",
626+
error = ?e,
627+
);
628+
}
629+
},
630+
Ok(None) => error!(message = "Payload builder handle is not setup, skipping sending payload"),
631+
Err(e) => error!(
632+
message = "Failed to get access to payload builder handle",
633+
error = ?e.
634+
),
635+
}
636+
}
637+
608638
/// Spawn task that will send new flashblock level cancel token in steady intervals (first interval
609639
/// may vary if --flashblocks.dynamic enabled)
610640
pub fn spawn_timer_task(

crates/op-rbuilder/src/builders/flashblocks/service.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::sync::{Arc, Mutex};
12
use super::{payload::OpPayloadBuilder, FlashblocksConfig};
23
use crate::{
34
builders::{
@@ -28,12 +29,17 @@ impl FlashblocksServiceBuilder {
2829
Pool: PoolBounds,
2930
BT: BuilderTx + Unpin + Clone + Send + Sync + 'static,
3031
{
32+
33+
let handle = Arc::new(Mutex::new(None));
34+
let payload_builder_handle = handle.clone();
35+
3136
let payload_builder = OpPayloadBuilder::new(
3237
OpEvmConfig::optimism(ctx.chain_spec()),
3338
pool,
3439
ctx.provider().clone(),
3540
self.0.clone(),
3641
builder_tx,
42+
handle,
3743
)?;
3844

3945
let payload_job_config = BasicPayloadJobGeneratorConfig::default();
@@ -50,6 +56,8 @@ impl FlashblocksServiceBuilder {
5056
let (payload_service, payload_builder) =
5157
PayloadBuilderService::new(payload_generator, ctx.provider().canonical_state_stream());
5258

59+
payload_builder_handle.lock()?.replace(payload_service.payload_events_handle());
60+
5361
ctx.task_executor()
5462
.spawn_critical("custom payload builder service", Box::pin(payload_service));
5563

crates/op-rbuilder/src/builders/generator.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,7 @@ use reth_basic_payload_builder::{
88
BasicPayloadJobGeneratorConfig, HeaderForPayload, PayloadConfig, PrecachedState,
99
};
1010
use reth_node_api::{NodePrimitives, PayloadBuilderAttributes, PayloadKind};
11-
use reth_payload_builder::{
12-
KeepPayloadJobAlive, PayloadBuilderError, PayloadJob, PayloadJobGenerator,
13-
};
11+
use reth_payload_builder::{KeepPayloadJobAlive, PayloadBuilderError, PayloadBuilderHandle, PayloadJob, PayloadJobGenerator};
1412
use reth_payload_primitives::BuiltPayload;
1513
use reth_primitives_traits::HeaderTy;
1614
use reth_provider::CanonStateNotification;
@@ -199,6 +197,7 @@ where
199197
deadline,
200198
build_complete: None,
201199
cached_reads: self.maybe_pre_cached(parent_header.hash()),
200+
payload_builder_handle: self.payload_builder_handle.clone(),
202201
};
203202

204203
job.spawn_build_job();
@@ -236,6 +235,9 @@ use std::{
236235
pin::Pin,
237236
task::{Context, Poll},
238237
};
238+
use tokio::sync::broadcast::Sender;
239+
use reth_optimism_node::OpEngineTypes;
240+
use reth_payload_builder_primitives::Events;
239241

240242
/// A [PayloadJob] that builds empty blocks.
241243
pub struct BlockPayloadJob<Tasks, Builder>
@@ -261,6 +263,7 @@ where
261263
/// This is used to avoid reading the same state over and over again when new attempts are
262264
/// triggered, because during the building process we'll repeatedly execute the transactions.
263265
pub(crate) cached_reads: Option<CachedReads>,
266+
pub(crate) payload_builder_handle: Arc<Mutex<Option<Sender<Events<OpEngineTypes>>>>>,
264267
}
265268

266269
impl<Tasks, Builder> PayloadJob for BlockPayloadJob<Tasks, Builder>

0 commit comments

Comments
 (0)