From 7d51f899a5d7177495c3e4a34ea093210e42c849 Mon Sep 17 00:00:00 2001 From: Ashok Menon Date: Wed, 12 Feb 2025 14:46:21 +0000 Subject: [PATCH] rpc-alt: testing query transaction + pruning ## Description Adding a test to make sure that transaction querying continues to work in the presence of pruning, because there is some tricky logic related to applying a lowerbound to the filtered results. At the same time, I also created a testing-specific config for the indexer, which speeds up polling but reduces concurrency -- this shaves off about 10 to 15s from local testing runs overall, and 30s for the newly added test (which generates a lot of checkpoints). ## Test plan ``` sui$ cargo nextest run -p sui-indexer-alt-e2e-tests \ --test transaction_pruning_tests ``` --- crates/sui-indexer-alt-e2e-tests/src/lib.rs | 62 +++- .../tests/name_service_tests.rs | 2 +- .../tests/transaction_pruning_tests.rs | 327 ++++++++++++++++++ .../tests/transactional_tests.rs | 13 +- crates/sui-indexer-alt/src/config.rs | 44 ++- 5 files changed, 432 insertions(+), 16 deletions(-) create mode 100644 crates/sui-indexer-alt-e2e-tests/tests/transaction_pruning_tests.rs diff --git a/crates/sui-indexer-alt-e2e-tests/src/lib.rs b/crates/sui-indexer-alt-e2e-tests/src/lib.rs index 694c0d89d89c7f..f7a8f837690d7e 100644 --- a/crates/sui-indexer-alt-e2e-tests/src/lib.rs +++ b/crates/sui-indexer-alt-e2e-tests/src/lib.rs @@ -8,7 +8,7 @@ use std::{ }; use anyhow::{bail, Context}; -use diesel::{ExpressionMethods, QueryDsl}; +use diesel::{ExpressionMethods, OptionalExtension, QueryDsl}; use diesel_async::RunQueryDsl; use simulacrum::Simulacrum; use sui_indexer_alt::{config::IndexerConfig, setup_indexer}; @@ -192,7 +192,8 @@ impl FullCluster { self.offchain.latest_checkpoint().await } - /// Waits until the indexer has caught up to the given checkpoint, or the timeout is reached. + /// Waits until the indexer has caught up to the given `checkpoint`, or the `timeout` is + /// reached (an error). pub async fn wait_for_checkpoint( &self, checkpoint: u64, @@ -201,6 +202,19 @@ impl FullCluster { self.offchain.wait_for_checkpoint(checkpoint, timeout).await } + /// Waits until the indexer's pruner has caught up to the given `checkpoint`, for the given + /// `pipeline`, or the `timeout` is reached (an error). + pub async fn wait_for_pruner( + &self, + pipeline: &str, + checkpoint: u64, + timeout: Duration, + ) -> Result<(), Elapsed> { + self.offchain + .wait_for_pruner(pipeline, checkpoint, timeout) + .await + } + /// Triggers cancellation of all downstream services, waits for them to stop, cleans up the /// temporary database, and the temporary directory used for ingestion. pub async fn stopped(self) { @@ -320,7 +334,28 @@ impl OffchainCluster { Ok(latest.into_values().min().map(|l| l as u64)) } - /// Waits until the indexer has caught up to the given checkpoint, or the timeout is reached. + /// TODO: Docs + pub async fn latest_pruner_checkpoint(&self, pipeline: &str) -> anyhow::Result> { + use watermarks::dsl as w; + + let mut conn = self + .db + .connect() + .await + .context("Failed to connect to database")?; + + let latest: Option = w::watermarks + .select(w::reader_lo) + .filter(w::pipeline.eq(pipeline)) + .first(&mut conn) + .await + .optional()?; + + Ok(latest.map(|l| l as u64)) + } + + /// Waits until the indexer has caught up to the given `checkpoint`, or the `timeout` is + /// reached (an error). pub async fn wait_for_checkpoint( &self, checkpoint: u64, @@ -331,13 +366,32 @@ impl OffchainCluster { if matches!(self.latest_checkpoint().await, Ok(Some(l)) if l >= checkpoint) { break; } else { - tokio::time::sleep(std::time::Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_millis(200)).await; } } }) .await } + /// Waits until the indexer's pruner has caught up to the given `checkpoint`, for the given + /// `pipeline`, or the `timeout` is reached (an error). + pub async fn wait_for_pruner( + &self, + pipeline: &str, + checkpoint: u64, + timeout: Duration, + ) -> Result<(), Elapsed> { + tokio::time::timeout(timeout, async move { + loop { + if matches!(self.latest_pruner_checkpoint(pipeline).await, Ok(Some(l)) if l >= checkpoint) { + break; + } else { + tokio::time::sleep(Duration::from_millis(200)).await; + } + } + }).await + } + /// Triggers cancellation of all downstream services, waits for them to stop, and cleans up the /// temporary database. pub async fn stopped(self) { diff --git a/crates/sui-indexer-alt-e2e-tests/tests/name_service_tests.rs b/crates/sui-indexer-alt-e2e-tests/tests/name_service_tests.rs index 036a3a3d3cd74f..ee98080ba21203 100644 --- a/crates/sui-indexer-alt-e2e-tests/tests/name_service_tests.rs +++ b/crates/sui-indexer-alt-e2e-tests/tests/name_service_tests.rs @@ -408,7 +408,7 @@ impl SuiNSCluster { sim, IndexerArgs::default(), SystemPackageTaskArgs::default(), - IndexerConfig::example(), + IndexerConfig::for_test(), rpc_config, &prometheus::Registry::new(), CancellationToken::new(), diff --git a/crates/sui-indexer-alt-e2e-tests/tests/transaction_pruning_tests.rs b/crates/sui-indexer-alt-e2e-tests/tests/transaction_pruning_tests.rs new file mode 100644 index 00000000000000..d0a733da9e59a8 --- /dev/null +++ b/crates/sui-indexer-alt-e2e-tests/tests/transaction_pruning_tests.rs @@ -0,0 +1,327 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//! These tests check that transaction queries respond correctly to pruning, especially given that + +use std::{str::FromStr, time::Duration}; + +use reqwest::Client; +use serde_json::{json, Value}; +use simulacrum::Simulacrum; +use sui_indexer_alt::config::{ConcurrentLayer, IndexerConfig, PipelineLayer, PrunerLayer}; +use sui_indexer_alt_e2e_tests::{find_address_owned, FullCluster}; +use sui_indexer_alt_framework::IndexerArgs; +use sui_indexer_alt_jsonrpc::{ + config::RpcConfig, data::system_package_task::SystemPackageTaskArgs, +}; +use sui_macros::sim_test; +use sui_types::{ + base_types::SuiAddress, + crypto::{get_account_key_pair, Signature, Signer}, + digests::TransactionDigest, + effects::TransactionEffectsAPI, + programmable_transaction_builder::ProgrammableTransactionBuilder, + transaction::{Transaction, TransactionData}, +}; +use tokio_util::sync::CancellationToken; + +/// 5 SUI gas budget +const DEFAULT_GAS_BUDGET: u64 = 5_000_000_000; + +// Check that querying transactions by sender works when fetchings transactions all in one go, and +// paginated, in ascending and descending order, for both `a` and `b`. +macro_rules! check_tx_digests { + ($cluster:expr, $sender:expr, $desc:expr, $expect:expr) => {{ + let cluster = $cluster; + let desc = $desc; + let sender = $sender; + + let (all_txs, _) = query_transactions(cluster, sender, None, 100, desc).await; + + let mut next = None; + let mut paginated = vec![]; + loop { + let (page, cursor) = query_transactions(cluster, sender, next, 4, desc).await; + paginated.extend(page); + + next = cursor; + if next.is_none() { + break; + } + } + + let expect: Vec<_> = $expect.copied().collect(); + + assert_eq!(all_txs, expect, "Mismatch fetching all transactions"); + assert_eq!(paginated, expect, "Mismatch fetching paged transactions"); + }}; +} + +/// Set-up a cluster where the filter (`tx_affected_addresses`) table is pruned more than the +/// digests table, and RPC calls querying transactions respect both pruning configurations. +#[sim_test] +async fn test_filter_pruned() { + let mut cluster = cluster_with_pipelines(PipelineLayer { + tx_affected_addresses: Some(concurrent_pipeline(5)), + tx_digests: Some(concurrent_pipeline(10)), + kv_transactions: Some(ConcurrentLayer::default()), + cp_sequence_numbers: Some(ConcurrentLayer::default()), + ..Default::default() + }) + .await; + + let (a, akp) = get_account_key_pair(); + let (b, bkp) = get_account_key_pair(); + + let mut a_txs = vec![]; + let mut b_txs = vec![]; + + // (1) Create 5 checkpoints with transactions from `a` and `b`. Each checkpoint contains two + // transactions form `a` and one from `b`. At this point nothing should be pruned on either + // side. + for _ in 0..5 { + a_txs.push(transfer_dust(&mut cluster, a, &akp, b)); + a_txs.push(transfer_dust(&mut cluster, a, &akp, b)); + b_txs.push(transfer_dust(&mut cluster, b, &bkp, a)); + cluster.create_checkpoint().await.unwrap(); + } + + check_tx_digests!(&cluster, a, false, a_txs.iter()); + check_tx_digests!(&cluster, b, false, b_txs.iter()); + check_tx_digests!(&cluster, a, true, a_txs.iter().rev()); + check_tx_digests!(&cluster, b, true, b_txs.iter().rev()); + + // (2) Add 5 more checkpoints, now the filter table is pruned, but the digests are not. + for _ in 0..5 { + a_txs.push(transfer_dust(&mut cluster, a, &akp, b)); + a_txs.push(transfer_dust(&mut cluster, a, &akp, b)); + b_txs.push(transfer_dust(&mut cluster, b, &bkp, a)); + cluster.create_checkpoint().await.unwrap(); + } + + cluster + .wait_for_pruner("tx_affected_addresses", 5, Duration::from_secs(10)) + .await + .unwrap(); + + check_tx_digests!(&cluster, a, false, a_txs[10..].iter()); + check_tx_digests!(&cluster, b, false, b_txs[5..].iter()); + check_tx_digests!(&cluster, a, true, a_txs[10..].iter().rev()); + check_tx_digests!(&cluster, b, true, b_txs[5..].iter().rev()); + + // (3) Last 5 checkpoints, now both tables have been pruned. + for _ in 0..5 { + a_txs.push(transfer_dust(&mut cluster, a, &akp, b)); + a_txs.push(transfer_dust(&mut cluster, a, &akp, b)); + b_txs.push(transfer_dust(&mut cluster, b, &bkp, a)); + cluster.create_checkpoint().await.unwrap(); + } + + cluster + .wait_for_pruner("tx_digests", 5, Duration::from_secs(10)) + .await + .unwrap(); + + cluster + .wait_for_pruner("tx_affected_addresses", 10, Duration::from_secs(10)) + .await + .unwrap(); + + check_tx_digests!(&cluster, a, false, a_txs[20..].iter()); + check_tx_digests!(&cluster, b, false, b_txs[10..].iter()); + check_tx_digests!(&cluster, a, true, a_txs[20..].iter().rev()); + check_tx_digests!(&cluster, b, true, b_txs[10..].iter().rev()); +} + +/// The same as the test above, but this time the digests are pruned more than the filter. +#[sim_test] +async fn test_digests_pruned() { + let mut cluster = cluster_with_pipelines(PipelineLayer { + tx_affected_addresses: Some(concurrent_pipeline(10)), + tx_digests: Some(concurrent_pipeline(5)), + kv_transactions: Some(ConcurrentLayer::default()), + cp_sequence_numbers: Some(ConcurrentLayer::default()), + ..Default::default() + }) + .await; + + let (a, akp) = get_account_key_pair(); + let (b, bkp) = get_account_key_pair(); + + let mut a_txs = vec![]; + let mut b_txs = vec![]; + + // (1) Create 5 checkpoints with transactions from `a` and `b`. Each checkpoint contains two + // transactions form `a` and one from `b`. At this point nothing should be pruned on either + // side. + for _ in 0..5 { + a_txs.push(transfer_dust(&mut cluster, a, &akp, b)); + a_txs.push(transfer_dust(&mut cluster, a, &akp, b)); + b_txs.push(transfer_dust(&mut cluster, b, &bkp, a)); + cluster.create_checkpoint().await.unwrap(); + } + + check_tx_digests!(&cluster, a, false, a_txs.iter()); + check_tx_digests!(&cluster, b, false, b_txs.iter()); + check_tx_digests!(&cluster, a, true, a_txs.iter().rev()); + check_tx_digests!(&cluster, b, true, b_txs.iter().rev()); + + // (2) Add 5 more checkpoints, now the digests table is pruned, but the filters are not. + for _ in 0..5 { + a_txs.push(transfer_dust(&mut cluster, a, &akp, b)); + a_txs.push(transfer_dust(&mut cluster, a, &akp, b)); + b_txs.push(transfer_dust(&mut cluster, b, &bkp, a)); + cluster.create_checkpoint().await.unwrap(); + } + + cluster + .wait_for_pruner("tx_digests", 5, Duration::from_secs(10)) + .await + .unwrap(); + + check_tx_digests!(&cluster, a, false, a_txs[10..].iter()); + check_tx_digests!(&cluster, b, false, b_txs[5..].iter()); + check_tx_digests!(&cluster, a, true, a_txs[10..].iter().rev()); + check_tx_digests!(&cluster, b, true, b_txs[5..].iter().rev()); + + // (3) Last 5 checkpoints, now both tables have been pruned. + for _ in 0..5 { + a_txs.push(transfer_dust(&mut cluster, a, &akp, b)); + a_txs.push(transfer_dust(&mut cluster, a, &akp, b)); + b_txs.push(transfer_dust(&mut cluster, b, &bkp, a)); + cluster.create_checkpoint().await.unwrap(); + } + + cluster + .wait_for_pruner("tx_digests", 10, Duration::from_secs(10)) + .await + .unwrap(); + + cluster + .wait_for_pruner("tx_affected_addresses", 5, Duration::from_secs(10)) + .await + .unwrap(); + + check_tx_digests!(&cluster, a, false, a_txs[20..].iter()); + check_tx_digests!(&cluster, a, true, a_txs[20..].iter().rev()); + check_tx_digests!(&cluster, b, false, b_txs[10..].iter()); + check_tx_digests!(&cluster, b, true, b_txs[10..].iter().rev()); +} + +/// Set-up a cluster with a custom configuration for pipelines. +async fn cluster_with_pipelines(pipeline: PipelineLayer) -> FullCluster { + FullCluster::new_with_configs( + Simulacrum::new(), + IndexerArgs::default(), + SystemPackageTaskArgs::default(), + IndexerConfig { + pipeline, + ..IndexerConfig::for_test() + }, + RpcConfig::example(), + &prometheus::Registry::new(), + CancellationToken::new(), + ) + .await + .expect("Failed to create cluster") +} + +/// Create a configuration for a concurrent pipeline with pruning configured to retain `retention` +/// checkpoints. +fn concurrent_pipeline(retention: u64) -> ConcurrentLayer { + ConcurrentLayer { + pruner: Some(PrunerLayer { + retention: Some(retention), + ..Default::default() + }), + ..Default::default() + } +} + +/// Request gas from the "faucet" in `cluster`, and craft a transaction transferring 1 MIST from +/// `sender` (signed for with `signer`) to `recipient`, and returns the digest of the transaction as +/// long as it succeeded. +fn transfer_dust( + cluster: &mut FullCluster, + sender: SuiAddress, + signer: &dyn Signer, + recipient: SuiAddress, +) -> TransactionDigest { + let fx = cluster + .request_gas(sender, DEFAULT_GAS_BUDGET + 1) + .expect("Failed to request gas"); + + let gas = find_address_owned(&fx).expect("Failed to find gas object"); + + let mut builder = ProgrammableTransactionBuilder::new(); + builder.transfer_sui(recipient, Some(1)); + + let data = TransactionData::new_programmable( + sender, + vec![gas], + builder.finish(), + DEFAULT_GAS_BUDGET, + cluster.reference_gas_price(), + ); + + let digest = data.digest(); + let (fx, _) = cluster + .execute_transaction(Transaction::from_data_and_signer(data, vec![signer])) + .expect("Failed to execute transaction"); + + assert!(fx.status().is_ok()); + digest +} + +/// Query a page of transactions sent by `sender` from the RPC on `cluster`. `cursor`, `limit`, and +/// `descending` control the pagination of the request. Returns a list of digests, and a cursor if +/// a next page exists and there is a cursor. +async fn query_transactions( + cluster: &FullCluster, + sender: SuiAddress, + cursor: Option, + limit: usize, + descending: bool, +) -> (Vec, Option) { + let query = json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "suix_queryTransactionBlocks", + "params": [ + { + "filter": { + "FromAddress": sender, + }, + }, + cursor, + limit, + descending + ] + }); + + let client = Client::new(); + let response = client + .post(cluster.rpc_url()) + .json(&query) + .send() + .await + .expect("Failed to send request"); + + let body: Value = response + .json() + .await + .expect("Failed to parse JSON-RPC response"); + + let mut digests = vec![]; + assert!(body["error"].is_null(), "RPC error: {}", body["error"]); + for result in body["result"]["data"].as_array().unwrap() { + let digest = result["digest"].as_str().unwrap(); + let digest = TransactionDigest::from_str(digest).unwrap(); + digests.push(digest); + } + + let has_next_page = body["result"]["hasNextPage"].as_bool().unwrap(); + let cursor = has_next_page.then(|| body["result"]["nextCursor"].as_str().unwrap().to_owned()); + + (digests, cursor) +} diff --git a/crates/sui-indexer-alt-e2e-tests/tests/transactional_tests.rs b/crates/sui-indexer-alt-e2e-tests/tests/transactional_tests.rs index 5e72d7a4ba6410..8522705e3edff5 100644 --- a/crates/sui-indexer-alt-e2e-tests/tests/transactional_tests.rs +++ b/crates/sui-indexer-alt-e2e-tests/tests/transactional_tests.rs @@ -15,7 +15,7 @@ use anyhow::{bail, Context}; use prometheus::Registry; use reqwest::Client; use serde_json::{json, Value}; -use sui_indexer_alt::config::IndexerConfig; +use sui_indexer_alt::config::{IndexerConfig, Merge, PrunerLayer}; use sui_indexer_alt_e2e_tests::OffchainCluster; use sui_indexer_alt_framework::{ingestion::ClientArgs, IndexerArgs}; use sui_indexer_alt_jsonrpc::{ @@ -113,10 +113,15 @@ async fn cluster(config: &OffChainConfig) -> Arc { // that rely on this behaviour will always fail, but this is better than flaky behavior. let system_package_task_args = SystemPackageTaskArgs::default(); - // The example config includes every pipeline, and we configure its consistent range using the + // The test config includes every pipeline, we configure its consistent range using the // off-chain config that was passed in. - let mut indexer_config = IndexerConfig::example(); - indexer_config.consistency.retention = Some(config.snapshot_config.snapshot_min_lag as u64); + let indexer_config = IndexerConfig::for_test().merge(IndexerConfig { + consistency: PrunerLayer { + retention: Some(config.snapshot_config.snapshot_min_lag as u64), + ..Default::default() + }, + ..Default::default() + }); let rpc_config = RpcConfig::example(); diff --git a/crates/sui-indexer-alt/src/config.rs b/crates/sui-indexer-alt/src/config.rs index 23b37d6aa5926a..ed2f55882b3ab2 100644 --- a/crates/sui-indexer-alt/src/config.rs +++ b/crates/sui-indexer-alt/src/config.rs @@ -68,8 +68,8 @@ pub struct IngestionLayer { #[DefaultConfig] #[derive(Clone, Default, Debug)] pub struct SequentialLayer { - committer: Option, - checkpoint_lag: Option, + pub committer: Option, + pub checkpoint_lag: Option, #[serde(flatten)] pub extra: toml::Table, @@ -78,8 +78,8 @@ pub struct SequentialLayer { #[DefaultConfig] #[derive(Clone, Default, Debug)] pub struct ConcurrentLayer { - committer: Option, - pruner: Option, + pub committer: Option, + pub pruner: Option, #[serde(flatten)] pub extra: toml::Table, @@ -88,9 +88,9 @@ pub struct ConcurrentLayer { #[DefaultConfig] #[derive(Clone, Default, Debug)] pub struct CommitterLayer { - write_concurrency: Option, - collect_interval_ms: Option, - watermark_interval_ms: Option, + pub write_concurrency: Option, + pub collect_interval_ms: Option, + pub watermark_interval_ms: Option, #[serde(flatten)] pub extra: toml::Table, @@ -163,6 +163,36 @@ impl IndexerConfig { example } + /// Generate a configuration suitable for testing. This is the same as the example + /// configuration, but with reduced concurrency and faster polling intervals so tests spend + /// less time waiting. + pub fn for_test() -> Self { + Self::example().merge(IndexerConfig { + ingestion: IngestionLayer { + retry_interval_ms: Some(10), + ingest_concurrency: Some(1), + ..Default::default() + }, + committer: CommitterLayer { + collect_interval_ms: Some(50), + watermark_interval_ms: Some(50), + write_concurrency: Some(1), + ..Default::default() + }, + consistency: PrunerLayer { + interval_ms: Some(50), + delay_ms: Some(0), + ..Default::default() + }, + pruner: PrunerLayer { + interval_ms: Some(50), + delay_ms: Some(0), + ..Default::default() + }, + ..Default::default() + }) + } + pub fn finish(mut self) -> IndexerConfig { check_extra("top-level", mem::take(&mut self.extra)); self