Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 141 additions & 18 deletions crates/walrus-e2e-tests/tests/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

#[cfg(msim)]
use std::sync::atomic::{AtomicU32, Ordering};
#[cfg(not(msim))]
use std::thread;
use std::{
collections::{HashMap, HashSet},
net::SocketAddr,
Expand Down Expand Up @@ -62,11 +64,11 @@ use walrus_sdk::{
Blocklist,
StoreArgs,
StoreBlobsApi as _,
StoreBlobsInBucketApi as _,
WalrusNodeClient,
byte_range_read_client::{ByteRangeReadClient, ByteRangeReadClientConfig},
client_types::WalrusStoreBlob,
quilt_client::QuiltClientConfig,
responses::{BlobStoreResult, QuiltStoreResult},
responses::{BlobBucketStoreResult, BlobStoreResult, QuiltStoreResult},
streaming::start_streaming_blob,
upload_relay_client::UploadRelayClient,
},
Expand Down Expand Up @@ -215,6 +217,126 @@ where
Ok(())
}

#[ignore = "ignore E2E tests by default"]
#[walrus_simtest]
async fn test_store_blobs_in_bucket() -> TestResult {
#[cfg(msim)]
{
test_store_blobs_in_bucket_inner().await
}

#[cfg(not(msim))]
run_with_large_stack(|| {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?
.block_on(test_store_blobs_in_bucket_inner())
})
}

async fn test_store_blobs_in_bucket_inner() -> TestResult {
walrus_test_utils::init_tracing();

let (_sui_cluster_handle, _cluster, client, system_context, _) =
test_cluster::E2eTestSetupBuilder::new().build().await?;
let blob_bucket_pkg_id = system_context
.blob_bucket_pkg_id
.expect("blob bucket package is published for tests");

let blobs = walrus_test_utils::random_data_list(10_000, 2);
let store_args = StoreArgs::default_with_epochs(2)
.no_store_optimizations()
.deletable();
let encoded_size = client
.as_ref()
.encoding_config()
.get_for_type(store_args.encoding_type)
.encoded_blob_length(blobs[0].len().try_into().unwrap())
.unwrap();

let blob_bucket = client
.as_ref()
.sui_client()
.create_blob_bucket(blob_bucket_pkg_id, encoded_size, 1)
.await?;
let initial_pool_status = client
.as_ref()
.sui_client()
.blob_bucket_storage_pool_status(blob_bucket.bucket_object_id)
.await?;
assert_eq!(
initial_pool_status.reserved_encoded_capacity_bytes,
encoded_size
);

let store_results = client
.as_ref()
.reserve_and_store_blobs_in_bucket(blobs.clone(), blob_bucket, &store_args)
.await?;

let final_pool_status = client
.as_ref()
.sui_client()
.blob_bucket_storage_pool_status(blob_bucket.bucket_object_id)
.await?;
assert!(
final_pool_status.end_epoch > initial_pool_status.end_epoch,
"bucket storage pool should have been extended"
);
assert!(
final_pool_status.reserved_encoded_capacity_bytes >= encoded_size * blobs.len() as u64,
"bucket storage pool should have increased capacity for all blobs"
);

let mut blob_ids = Vec::with_capacity(store_results.len());
for store_result in store_results {
match store_result {
BlobBucketStoreResult::NewlyCreated { pooled_blob_object } => {
assert_eq!(
pooled_blob_object.storage_pool_id,
blob_bucket.storage_pool_id
);
assert!(pooled_blob_object.deletable);
assert!(pooled_blob_object.is_certified());
blob_ids.push(pooled_blob_object.blob_id);
}
BlobBucketStoreResult::Error {
blob_id,
failure_phase,
error_msg,
} => panic!(
"unexpected bucket store error for blob {blob_id:?} in {failure_phase}: {}",
error_msg
),
}
}

for (blob_id, expected_data) in blob_ids.into_iter().zip(blobs) {
let read_data = client.as_ref().read_blob::<Primary>(&blob_id).await?;
assert_eq!(read_data, expected_data);
}

Ok(())
}

#[cfg(not(msim))]
fn run_with_large_stack<T>(f: impl FnOnce() -> TestResult<T> + Send + 'static) -> TestResult<T>
where
T: Send + 'static,
{
// This bucket e2e path drives a deep Sui/Move setup and can overflow the default test-thread
// stack on CI runners.
let thread = thread::Builder::new()
.name("large-stack-e2e-test".to_string())
.stack_size(64 * 1024 * 1024)
.spawn(f)?;
thread
.join()
.map_err(|panic| -> Box<dyn std::error::Error + Send + Sync> {
format!("large-stack test thread panicked: {panic:?}").into()
})?
}

async_param_test! {
#[ignore = "ignore E2E tests by default"]
#[walrus_simtest]
Expand Down Expand Up @@ -790,22 +912,23 @@ async fn test_store_with_existing_storage_resource(

let blobs = walrus_test_utils::random_data_list(31415, 4);
let encoding_type = DEFAULT_ENCODING;
let unencoded_blobs = blobs
.iter()
.cloned()
.enumerate()
.map(|(i, data)| {
WalrusStoreBlob::new_unencoded(
data,
format!("test-{i:02}"),
BlobAttribute::default(),
client
.as_ref()
.encoding_config()
.get_for_type(encoding_type),
)
})
.collect();
let unencoded_blobs: Vec<walrus_sdk::node_client::client_types::WalrusStoreBlobMaybeFinished> =
blobs
.iter()
.cloned()
.enumerate()
.map(|(i, data)| {
walrus_sdk::node_client::client_types::WalrusStoreBlobMaybeFinished::new_unencoded(
data,
format!("test-{i:02}"),
BlobAttribute::default(),
client
.as_ref()
.encoding_config()
.get_for_type(encoding_type),
)
})
.collect();
let encoded_blobs = walrus_sdk::node_client::encode_blobs(unencoded_blobs, None, None)?;
let encoded_sizes = encoded_blobs
.iter()
Expand Down
107 changes: 102 additions & 5 deletions crates/walrus-sdk/src/node_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,13 @@ use walrus_core::{
};
use walrus_storage_node_client::{UploadIntent, api::BlobStatus, error::NodeError};
use walrus_sui::{
client::{CertifyAndExtendBlobResult, ExpirySelectionPolicy, ReadClient, SuiContractClient},
client::{
BlobBucketHandle,
CertifyAndExtendBlobResult,
ExpirySelectionPolicy,
ReadClient,
SuiContractClient,
},
types::{
Blob,
BlobEvent,
Expand Down Expand Up @@ -100,7 +106,7 @@ use crate::{
quilt_client::QuiltClient,
refresh::{CommitteesRefresherHandle, RequestKind, are_current_previous_different},
resource::{PriceComputation, ResourceManager},
responses::{BlobStoreResult, BlobStoreResultWithPath},
responses::{BlobBucketStoreResult, BlobStoreResult, BlobStoreResultWithPath},
upload_relay_client::UploadRelayClient,
},
uploader::{DistributedUploader, RunOutput, TailHandling, UploaderEvent},
Expand Down Expand Up @@ -2879,6 +2885,60 @@ mod internal {
Ok(results.into_iter().map(|blob| blob.state).collect())
}
}

/// Encodes the blobs, stores them in the specified blob bucket, uploads the slivers to the
/// storage nodes, and certifies the pooled blobs.
fn reserve_and_store_blobs_in_bucket_inner(
&self,
walrus_store_blobs: Vec<
WalrusStoreBlobMaybeFinished<UnencodedBlob, BlobBucketStoreResult>,
>,
blob_bucket: BlobBucketHandle,
store_args: &StoreArgs,
) -> impl Future<Output = ClientResult<Vec<BlobBucketStoreResult>>> + Send {
async move {
let blobs_count = walrus_store_blobs.len();
if blobs_count == 0 {
tracing::debug!("no blobs provided to bucket store");
return Ok(vec![]);
}
let start = Instant::now();

let upload_relay_client = store_args.upload_relay_client.clone();
let encoding_event_tx = store_args.encoding_event_tx.clone();
let maybe_encoded_blobs = tokio::task::spawn_blocking(move || {
encode_blobs(
walrus_store_blobs,
upload_relay_client,
encoding_event_tx.as_ref(),
)
})
.await
.map_err(ClientError::other)??;
let (encoded_blobs, mut results) =
client_types::partition_unfinished_finished(maybe_encoded_blobs);
store_args.maybe_observe_encoding_latency(start.elapsed());

let client = self.client().await?;

if !encoded_blobs.is_empty() {
tracing::debug!(
backend = ?client.blob_bucket_store_backend(blob_bucket).kind(),
"selected blob bucket store backend"
);
let backend = client.blob_bucket_store_backend(blob_bucket);
let store_results = backend
.reserve_and_store_encoded_blobs(encoded_blobs.clone(), store_args)
.await?;
results.extend(store_results);
}

debug_assert_eq!(results.len(), blobs_count);
results.sort_by_key(|blob| blob.common.identifier.to_string());

Ok(results.into_iter().map(|blob| blob.state).collect())
}
}
}
}

Expand Down Expand Up @@ -3000,18 +3060,55 @@ impl StoreBlobsApi for WalrusNodeClient<SuiContractClient> {
}
}

/// A trait containing functions to store blobs into a blob bucket.
pub trait StoreBlobsInBucketApi: StoreBlobsApi + Sized {
/// Stores a list of blobs into the specified blob bucket.
///
/// Unlike the owned-blob store path, this does not automatically retry across epoch changes:
/// pooled blob registration is not idempotent, so an automatic full retry could create
/// duplicate pooled blobs in the bucket.
#[tracing::instrument(skip_all, fields(blob_id))]
fn reserve_and_store_blobs_in_bucket(
&self,
blobs: Vec<Vec<u8>>,
blob_bucket: BlobBucketHandle,
store_args: &StoreArgs,
) -> impl Future<Output = ClientResult<Vec<BlobBucketStoreResult>>> + Send {
async {
let walrus_store_blobs =
WalrusStoreBlobMaybeFinished::unencoded_blobs_with_default_identifiers(
blobs,
vec![],
self.encoding_config()
.await?
.get_for_type(store_args.encoding_type),
);
self.reserve_and_store_blobs_in_bucket_inner(
walrus_store_blobs,
blob_bucket,
store_args,
)
.await
}
}
}

impl StoreBlobsInBucketApi for WalrusNodeClientCreatedInBackground<SuiContractClient> {}

impl StoreBlobsInBucketApi for WalrusNodeClient<SuiContractClient> {}

/// Encodes multiple blobs.
///
/// Returns a list of WalrusStoreBlob as the encoded result. The return list
/// is in the same order as the input list.
/// A WalrusStoreBlob::Encoded is returned if the blob is encoded successfully.
/// A WalrusStoreBlob::Failed is returned if the blob fails to encode.
#[tracing::instrument(skip_all, fields(count = walrus_store_blobs.len()))]
pub fn encode_blobs(
walrus_store_blobs: Vec<WalrusStoreBlobMaybeFinished<UnencodedBlob>>,
pub fn encode_blobs<F: client_types::WalrusStoreFinalResultApi>(
walrus_store_blobs: Vec<WalrusStoreBlobMaybeFinished<UnencodedBlob, F>>,
upload_relay_client: Option<Arc<UploadRelayClient>>,
encoding_event_tx: Option<&tokio::sync::mpsc::UnboundedSender<EncodingProgressEvent>>,
) -> ClientResult<Vec<WalrusStoreBlobMaybeFinished<EncodedBlob>>> {
) -> ClientResult<Vec<WalrusStoreBlobMaybeFinished<EncodedBlob, F>>> {
let total_blobs_count = walrus_store_blobs.len();
if total_blobs_count == 0 {
return Ok(Vec::new());
Expand Down
Loading
Loading