Skip to content

Commit 6fab16d

Browse files
authored
refactor: parallelize collect_event for tipset range (#6881)
1 parent 715d34f commit 6fab16d

1 file changed

Lines changed: 45 additions & 11 deletions

File tree

  • src/rpc/methods/eth/filter

src/rpc/methods/eth/filter/mod.rs

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
//! - **Event Filter**: Captures blockchain events, such as smart contract log events, emitted by specific actors.
1515
//! - **TipSet Filter**: Tracks changes in the blockchain's tipset (the latest set of blocks).
1616
//! - **Mempool Filter**: Monitors the Ethereum mempool for new pending transactions that meet certain criteria.
17+
1718
pub mod event;
1819
pub mod mempool;
1920
mod store;
@@ -44,6 +45,7 @@ use crate::utils::misc::env::env_or_default;
4445
use ahash::AHashMap as HashMap;
4546
use anyhow::{Context, Error, anyhow, bail, ensure};
4647
use cid::Cid;
48+
use futures::{TryStreamExt as _, stream::FuturesOrdered};
4749
use fvm_ipld_blockstore::Blockstore;
4850
use fvm_ipld_encoding::IPLD_RAW;
4951
use serde::*;
@@ -261,13 +263,41 @@ impl EthEventHandler {
261263
)
262264
}
263265

266+
pub async fn collect_events_for_tipsets<DB: Blockstore + Send + Sync + 'static>(
267+
ctx: &Ctx<DB>,
268+
tipsets: impl Iterator<Item = Tipset>,
269+
spec: Option<&impl Matcher>,
270+
skip_event: SkipEvent,
271+
collected_events: &mut Vec<CollectedEvent>,
272+
) -> anyhow::Result<()> {
273+
let mut tasks = FuturesOrdered::new();
274+
for tipset in tipsets {
275+
tasks.push_back(async move {
276+
let mut events = vec![];
277+
Self::collect_events(ctx, &tipset, spec, skip_event, &mut events).await?;
278+
anyhow::Ok(events)
279+
});
280+
}
281+
let max_filter_results = ctx.eth_event_handler.max_filter_results;
282+
while let Some(events) = tasks.try_next().await? {
283+
let remaining = max_filter_results.saturating_sub(collected_events.len());
284+
ensure!(
285+
events.len() <= remaining,
286+
"filter matches too many events (maximum {max_filter_results}), try a more restricted filter"
287+
);
288+
collected_events.extend(events);
289+
}
290+
Ok(())
291+
}
292+
264293
pub async fn collect_events<DB: Blockstore + Send + Sync + 'static>(
265294
ctx: &Ctx<DB>,
266295
tipset: &Tipset,
267296
spec: Option<&impl Matcher>,
268297
skip_event: SkipEvent,
269298
collected_events: &mut Vec<CollectedEvent>,
270299
) -> anyhow::Result<()> {
300+
let max_filter_results = ctx.eth_event_handler.max_filter_results;
271301
let height = tipset.epoch();
272302
let tipset_key = tipset.key();
273303
let ExecutedTipset {
@@ -338,9 +368,10 @@ impl EthEventHandler {
338368
msg_idx: msg_idx as u64,
339369
msg_cid: message.cid(),
340370
};
341-
if collected_events.len() >= ctx.eth_event_handler.max_filter_results {
342-
bail!("filter matches too many events, try a more restricted filter");
343-
}
371+
ensure!(
372+
collected_events.len() <= max_filter_results,
373+
"filter matches too many events (maximum {max_filter_results} allowed), try a more restricted filter"
374+
);
344375
collected_events.push(ce);
345376
}
346377
}
@@ -399,19 +430,22 @@ impl EthEventHandler {
399430
} else {
400431
*range.end()
401432
};
402-
403433
let max_tipset = ctx.chain_index().tipset_by_height(
404434
max_height,
405435
ctx.chain_store().heaviest_tipset(),
406436
ResolveNullTipset::TakeOlder,
407437
)?;
408-
for tipset in max_tipset
409-
.chain(&ctx.store())
410-
.take_while(|ts| ts.epoch() >= *range.start())
411-
{
412-
Self::collect_events(ctx, &tipset, Some(pf), skip_event, &mut collected_events)
413-
.await?;
414-
}
438+
let tipsets = max_tipset
439+
.chain(ctx.store())
440+
.take_while(|ts| ts.epoch() >= *range.start());
441+
Self::collect_events_for_tipsets(
442+
ctx,
443+
tipsets,
444+
Some(pf),
445+
skip_event,
446+
&mut collected_events,
447+
)
448+
.await?;
415449
}
416450
}
417451

0 commit comments

Comments
 (0)