Skip to content

Commit

Permalink
rpc-alt: testing query transaction + pruning
Browse files Browse the repository at this point in the history
## Description

Adding a test to make sure that transaction querying continues to work
in the presence of pruning, because there is some tricky logic related
to applying a lowerbound to the filtered results.

At the same time, I also created a testing-specific config for the
indexer, which speeds up polling but reduces concurrency -- this shaves
off about 10 to 15s from local testing runs overall, and 30s for the
newly added test (which generates a lot of checkpoints).

## Test plan

```
sui$ cargo nextest run -p sui-indexer-alt-e2e-tests \
 --test transaction_pruning_tests
```
  • Loading branch information
amnn committed Feb 14, 2025
1 parent a8d0754 commit 66a1c18
Show file tree
Hide file tree
Showing 5 changed files with 431 additions and 16 deletions.
62 changes: 58 additions & 4 deletions crates/sui-indexer-alt-e2e-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
};

use anyhow::{bail, Context};
use diesel::{ExpressionMethods, QueryDsl};
use diesel::{ExpressionMethods, OptionalExtension, QueryDsl};
use diesel_async::RunQueryDsl;
use simulacrum::Simulacrum;
use sui_indexer_alt::{config::IndexerConfig, setup_indexer};
Expand Down Expand Up @@ -192,7 +192,8 @@ impl FullCluster {
self.offchain.latest_checkpoint().await
}

/// Waits until the indexer has caught up to the given checkpoint, or the timeout is reached.
/// Waits until the indexer has caught up to the given `checkpoint`, or the `timeout` is
/// reached (an error).
pub async fn wait_for_checkpoint(
&self,
checkpoint: u64,
Expand All @@ -201,6 +202,19 @@ impl FullCluster {
self.offchain.wait_for_checkpoint(checkpoint, timeout).await
}

/// Waits until the indexer's pruner has caught up to the given `checkpoint`, for the given
/// `pipeline`, or the `timeout` is reached (an error).
pub async fn wait_for_pruner(
&self,
pipeline: &str,
checkpoint: u64,
timeout: Duration,
) -> Result<(), Elapsed> {
self.offchain
.wait_for_pruner(pipeline, checkpoint, timeout)
.await
}

/// Triggers cancellation of all downstream services, waits for them to stop, cleans up the
/// temporary database, and the temporary directory used for ingestion.
pub async fn stopped(self) {
Expand Down Expand Up @@ -320,7 +334,28 @@ impl OffchainCluster {
Ok(latest.into_values().min().map(|l| l as u64))
}

/// Waits until the indexer has caught up to the given checkpoint, or the timeout is reached.
/// TODO: Docs
pub async fn latest_pruner_checkpoint(&self, pipeline: &str) -> anyhow::Result<Option<u64>> {
use watermarks::dsl as w;

let mut conn = self
.db
.connect()
.await
.context("Failed to connect to database")?;

let latest: Option<i64> = w::watermarks
.select(w::reader_lo)
.filter(w::pipeline.eq(pipeline))
.first(&mut conn)
.await
.optional()?;

Ok(latest.map(|l| l as u64))
}

/// Waits until the indexer has caught up to the given `checkpoint`, or the `timeout` is
/// reached (an error).
pub async fn wait_for_checkpoint(
&self,
checkpoint: u64,
Expand All @@ -331,13 +366,32 @@ impl OffchainCluster {
if matches!(self.latest_checkpoint().await, Ok(Some(l)) if l >= checkpoint) {
break;
} else {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_millis(200)).await;
}
}
})
.await
}

/// Waits until the indexer's pruner has caught up to the given `checkpoint`, for the given
/// `pipeline`, or the `timeout` is reached (an error).
pub async fn wait_for_pruner(
&self,
pipeline: &str,
checkpoint: u64,
timeout: Duration,
) -> Result<(), Elapsed> {
tokio::time::timeout(timeout, async move {
loop {
if matches!(self.latest_pruner_checkpoint(pipeline).await, Ok(Some(l)) if l >= checkpoint) {
break;
} else {
tokio::time::sleep(Duration::from_millis(200)).await;
}
}
}).await
}

/// Triggers cancellation of all downstream services, waits for them to stop, and cleans up the
/// temporary database.
pub async fn stopped(self) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ impl SuiNSCluster {
sim,
IndexerArgs::default(),
SystemPackageTaskArgs::default(),
IndexerConfig::example(),
IndexerConfig::for_test(),
rpc_config,
&prometheus::Registry::new(),
CancellationToken::new(),
Expand Down
Loading

0 comments on commit 66a1c18

Please sign in to comment.