Skip to content

Commit 05870a0

Browse files
greged93frisitano
andauthored
feat: implement improvement listed during knowledge sharing (#414)
Co-authored-by: frisitano <[email protected]>
1 parent a0643df commit 05870a0

File tree

5 files changed

+30
-12
lines changed

5 files changed

+30
-12
lines changed

crates/codec/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ pub enum Codec {
4141

4242
impl Codec {
4343
/// Decodes the input data and returns the decoded [`Batch`].
44-
pub fn decode<T: CommitDataSource>(input: &T) -> Result<Batch, CodecError> {
44+
pub fn decode<T: CommitDataSource>(input: T) -> Result<Batch, CodecError> {
4545
let calldata = input.calldata();
4646
let version = get_codec_version(calldata)?;
4747

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
1-
use alloy_eips::eip4844::Blob;
21
use alloy_primitives::Bytes;
32
use scroll_codec::CommitDataSource;
43

54
/// Holds the data for the codec.
6-
pub(crate) struct CodecDataSource<'a> {
7-
pub(crate) calldata: &'a Bytes,
8-
pub(crate) blob: Option<&'a Blob>,
5+
pub(crate) struct CodecDataSource<Calldata, Blob> {
6+
pub(crate) calldata: Calldata,
7+
pub(crate) blob: Option<Blob>,
98
}
109

11-
impl<'a> CommitDataSource for CodecDataSource<'a> {
10+
impl<Calldata: AsRef<Bytes>, Blob: AsRef<alloy_eips::eip4844::Blob>> CommitDataSource
11+
for CodecDataSource<Calldata, Blob>
12+
{
1213
fn calldata(&self) -> &Bytes {
13-
self.calldata
14+
self.calldata.as_ref()
1415
}
1516

16-
fn blob(&self) -> Option<&Blob> {
17-
self.blob
17+
fn blob(&self) -> Option<&alloy_eips::eip4844::Blob> {
18+
self.blob.as_ref().map(|b| b.as_ref())
1819
}
1920
}

crates/derivation-pipeline/src/lib.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use futures::{stream::FuturesOrdered, Stream, StreamExt};
1010
use rollup_node_primitives::{BatchCommitData, BatchInfo, BatchStatus, L1MessageEnvelope};
1111
use rollup_node_providers::L1Provider;
1212
use scroll_alloy_rpc_types_engine::{BlockDataHint, ScrollPayloadAttributes};
13-
use scroll_codec::{decoding::payload::PayloadData, Codec};
13+
use scroll_codec::{decoding::payload::PayloadData, Codec, CodecError, DecodingError};
1414
use scroll_db::{Database, DatabaseReadOperations, L1MessageKey};
1515
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
1616

@@ -299,8 +299,15 @@ pub async fn derive<L1P: L1Provider + Sync + Send, DB: DatabaseReadOperations>(
299299
} else {
300300
None
301301
};
302-
let data = CodecDataSource { calldata: batch.calldata.as_ref(), blob: blob.as_deref() };
303-
let decoded = Codec::decode(&data)?;
302+
303+
let data = CodecDataSource { calldata: batch.calldata.clone(), blob };
304+
305+
let decoded =
306+
tokio::task::spawn_blocking(move || Codec::decode(data)).await.map_err(|err| {
307+
DerivationPipelineError::Codec(CodecError::Decoding(DecodingError::Other(Box::new(
308+
err,
309+
))))
310+
})??;
304311

305312
// set the cursor for the l1 provider.
306313
let payload_data = &decoded.data;

crates/watcher/src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,7 @@ pub enum FilterLogError {
5555
/// The log is missing a transaction hash.
5656
#[error("unknown transaction hash for log")]
5757
MissingTransactionHash,
58+
/// Invalid extracted notification length.
59+
#[error("expected {0} notifications, got {1}")]
60+
InvalidNotificationCount(usize, usize),
5861
}

crates/watcher/src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,13 @@ where
341341
notifications.push(system_contract_update);
342342
}
343343

344+
if logs.len() != notifications.len() {
345+
return Err(L1WatcherError::Logs(FilterLogError::InvalidNotificationCount(
346+
logs.len(),
347+
notifications.len(),
348+
)))
349+
}
350+
344351
// send all notifications on the channel.
345352
self.notify_all(notifications).await?;
346353

0 commit comments

Comments
 (0)