Skip to content

Commit 36c80b8

Browse files
committed
feat: decoding methods
1 parent 81e4e5c commit 36c80b8

File tree

2 files changed

+58
-10
lines changed

2 files changed

+58
-10
lines changed

crates/blobber/src/cache.rs

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
use crate::{BlobFetcherError, Blobs, FetchResult};
2-
use alloy::primitives::B256;
3-
use reth::network::cache::LruMap;
2+
use alloy::consensus::{SidecarCoder, SimpleCoder, Transaction as _};
3+
use alloy::primitives::{keccak256, Bytes, B256};
44
use reth::transaction_pool::TransactionPool;
5+
use reth::{network::cache::LruMap, primitives::Receipt};
6+
use signet_extract::ExtractedEvent;
7+
use signet_zenith::Zenith::BlockSubmitted;
8+
use signet_zenith::ZenithBlock;
59
use std::{
610
sync::{Arc, Mutex},
711
time::Duration,
@@ -19,7 +23,7 @@ const BETWEEN_RETRIES: Duration = Duration::from_millis(250);
1923
/// retrieving blobs.
2024
#[derive(Debug)]
2125
enum CacheInst {
22-
Retrieve { slot: u64, tx_hash: B256, version_hashes: Vec<B256>, resp: oneshot::Sender<Blobs> },
26+
Retrieve { slot: usize, tx_hash: B256, version_hashes: Vec<B256>, resp: oneshot::Sender<Blobs> },
2327
}
2428

2529
/// Handle for the cache.
@@ -38,7 +42,7 @@ impl CacheHandle {
3842
/// fetch blobs if they are not found in the cache.
3943
pub async fn fetch_blobs(
4044
&self,
41-
slot: u64,
45+
slot: usize,
4246
tx_hash: B256,
4347
version_hashes: Vec<B256>,
4448
) -> FetchResult<Blobs> {
@@ -48,13 +52,53 @@ impl CacheHandle {
4852

4953
receiver.await.map_err(|_| BlobFetcherError::missing_sidecar(tx_hash))
5054
}
55+
56+
/// Fetch the blobs using [`Self::fetch_blobs`] and decode them to get the
57+
/// Zenith block data.
58+
pub async fn fetch_and_decode(
59+
&self,
60+
slot: usize,
61+
extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>,
62+
) -> FetchResult<Bytes> {
63+
let tx_hash = extract.tx_hash();
64+
let versioned_hashes = extract
65+
.tx
66+
.as_eip4844()
67+
.ok_or_else(BlobFetcherError::non_4844_transaction)?
68+
.blob_versioned_hashes()
69+
.expect("tx is eip4844");
70+
71+
let blobs = self.fetch_blobs(slot, tx_hash, versioned_hashes.to_owned()).await?;
72+
73+
SimpleCoder::default()
74+
.decode_all(blobs.as_ref())
75+
.ok_or_else(BlobFetcherError::blob_decode_error)?
76+
.into_iter()
77+
.find(|data| keccak256(data) == extract.block_data_hash())
78+
.map(Into::into)
79+
.ok_or_else(|| BlobFetcherError::block_data_not_found(tx_hash))
80+
}
81+
82+
/// Fetch the blobs, decode them, and construct a Zenith block from the
83+
/// header and data.
84+
pub async fn signet_block(
85+
&self,
86+
host_block_number: u64,
87+
slot: usize,
88+
extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>,
89+
) -> FetchResult<ZenithBlock> {
90+
let header = extract.ru_header(host_block_number);
91+
self.fetch_and_decode(slot, extract)
92+
.await
93+
.map(|buf| ZenithBlock::from_header_and_data(header, buf))
94+
}
5195
}
5296

5397
/// Retrieves blobs and stores them in a cache for later use.
5498
pub struct BlobCacher<Pool> {
5599
fetcher: crate::BlobFetcher<Pool>,
56100

57-
cache: Mutex<LruMap<(u64, B256), Blobs>>,
101+
cache: Mutex<LruMap<(usize, B256), Blobs>>,
58102
}
59103

60104
impl<Pool: core::fmt::Debug> core::fmt::Debug for BlobCacher<Pool> {
@@ -73,7 +117,7 @@ impl<Pool: TransactionPool + 'static> BlobCacher<Pool> {
73117
#[instrument(skip(self), target = "signet_blobber::BlobCacher", fields(retries = FETCH_RETRIES))]
74118
async fn fetch_blobs(
75119
&self,
76-
slot: u64,
120+
slot: usize,
77121
tx_hash: B256,
78122
versioned_hashes: Vec<B256>,
79123
) -> FetchResult<Blobs> {

crates/blobber/src/fetch.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,8 @@ where
140140
/// searching for the expected hash
141141
async fn get_and_decode_blobs(
142142
&self,
143+
slot: usize,
143144
extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>,
144-
slot: u64,
145145
) -> FetchResult<Vec<u8>> {
146146
debug_assert!(extract.tx.is_eip4844(), "Transaction must be of type EIP-4844");
147147
let hash = extract.tx.tx_hash();
@@ -165,7 +165,7 @@ where
165165
#[instrument(skip(self, versioned_hashes))]
166166
pub(crate) async fn fetch_blobs(
167167
&self,
168-
slot: u64,
168+
slot: usize,
169169
tx_hash: B256,
170170
versioned_hashes: &[B256],
171171
) -> FetchResult<Blobs> {
@@ -224,7 +224,11 @@ where
224224

225225
/// Queries the connected consensus client for the blob transaction
226226
#[instrument(skip_all, err)]
227-
async fn get_blobs_from_cl(&self, slot: u64, versioned_hashes: &[B256]) -> FetchResult<Blobs> {
227+
async fn get_blobs_from_cl(
228+
&self,
229+
slot: usize,
230+
versioned_hashes: &[B256],
231+
) -> FetchResult<Blobs> {
228232
if let Some(url) = &self.cl_url {
229233
let url = url.join(&format!("/eth/v1/beacon/blob_sidecars/{slot}")).map_err(|err| {
230234
BlobFetcherError::Unrecoverable(UnrecoverableBlobError::UrlParse(err))
@@ -261,7 +265,7 @@ where
261265
.slot_ending_at(host_block_timestamp)
262266
.expect("host chain has started");
263267

264-
let block_data = self.get_and_decode_blobs(extract, slot as u64).await?;
268+
let block_data = self.get_and_decode_blobs(slot, extract).await?;
265269
Ok(ZenithBlock::from_header_and_data(header, block_data))
266270
}
267271

0 commit comments

Comments
 (0)