diff --git a/Cargo.lock b/Cargo.lock index 886d0a2c8ffeb..4cea0eeeadb36 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9252,7 +9252,6 @@ dependencies = [ "parking_lot", "pot", "rand 0.9.0", - "rayon", "regex", "ringmap", "rstest", diff --git a/turbopack/crates/turbo-persistence-tools/src/main.rs b/turbopack/crates/turbo-persistence-tools/src/main.rs index 25fbe6d31201b..6a384bae92ab4 100644 --- a/turbopack/crates/turbo-persistence-tools/src/main.rs +++ b/turbopack/crates/turbo-persistence-tools/src/main.rs @@ -3,7 +3,7 @@ use std::path::PathBuf; use anyhow::{Context, Result, bail}; -use turbo_persistence::{MetaFileEntryInfo, TurboPersistence}; +use turbo_persistence::{MetaFileEntryInfo, SerialScheduler, TurboPersistence}; fn main() -> Result<()> { // Get CLI argument @@ -16,7 +16,7 @@ fn main() -> Result<()> { bail!("The provided path does not exist: {}", path.display()); } - let db = TurboPersistence::open_read_only(path)?; + let db: TurboPersistence = TurboPersistence::open_read_only(path)?; let meta_info = db .meta_info() .context("Failed to retrieve meta information")?; diff --git a/turbopack/crates/turbo-persistence/Cargo.toml b/turbopack/crates/turbo-persistence/Cargo.toml index d771b66855296..ac4e7e5cc45f3 100644 --- a/turbopack/crates/turbo-persistence/Cargo.toml +++ b/turbopack/crates/turbo-persistence/Cargo.toml @@ -22,7 +22,6 @@ memmap2 = "0.9.5" parking_lot = { workspace = true } qfilter = { version = "0.2.4", features = ["serde"] } quick_cache = { workspace = true } -rayon = { workspace = true } rustc-hash = { workspace = true } smallvec = { workspace = true } thread_local = { workspace = true } @@ -32,6 +31,7 @@ zstd = { version = "0.13.2", features = ["zdict_builder"] } [dev-dependencies] rand = { workspace = true, features = ["small_rng"] } +rayon = { workspace = true } tempfile = { workspace = true } [lints] diff --git a/turbopack/crates/turbo-persistence/src/collector.rs b/turbopack/crates/turbo-persistence/src/collector.rs index ea8b04ab16e70..6637ea2c13e3c 100644 --- a/turbopack/crates/turbo-persistence/src/collector.rs +++ b/turbopack/crates/turbo-persistence/src/collector.rs @@ -1,3 +1,5 @@ +use std::mem::take; + use crate::{ ValueBuffer, collector_entry::{CollectorEntry, CollectorEntryValue, EntryKey}, @@ -111,4 +113,11 @@ impl Collector { self.total_value_size = 0; self.entries.drain(..) } + + /// Clears the collector and drops the capacity + pub fn drop_contents(&mut self) { + drop(take(&mut self.entries)); + self.total_key_size = 0; + self.total_value_size = 0; + } } diff --git a/turbopack/crates/turbo-persistence/src/db.rs b/turbopack/crates/turbo-persistence/src/db.rs index 58e35e7d19175..9936c9349cc8b 100644 --- a/turbopack/crates/turbo-persistence/src/db.rs +++ b/turbopack/crates/turbo-persistence/src/db.rs @@ -18,8 +18,6 @@ use jiff::Timestamp; use lzzzz::lz4::decompress; use memmap2::Mmap; use parking_lot::{Mutex, RwLock}; -use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; -use tracing::Span; pub use crate::compaction::selector::CompactConfig; use crate::{ @@ -36,6 +34,7 @@ use crate::{ merge_iter::MergeIter, meta_file::{AmqfCache, MetaFile, MetaLookupResult, StaticSortedFileRange}, meta_file_builder::MetaFileBuilder, + parallel_scheduler::ParallelScheduler, sst_filter::SstFilter, static_sorted_file::{BlockCache, SstLookupResult}, static_sorted_file_builder::{StaticSortedFileBuilderMeta, write_static_stored_file}, @@ -108,7 +107,8 @@ struct TrackedStats { /// TurboPersistence is a persistent key-value store. It is limited to a single writer at a time /// using a single write batch. It allows for concurrent reads. -pub struct TurboPersistence { +pub struct TurboPersistence { + parallel_scheduler: S, /// The path to the directory where the database is stored path: PathBuf, /// If true, the database is opened in read-only mode. In this mode, no writes are allowed and @@ -148,9 +148,26 @@ pub struct CommitOptions { keys_written: u64, } -impl TurboPersistence { - fn new(path: PathBuf, read_only: bool) -> Self { +impl TurboPersistence { + /// Open a TurboPersistence database at the given path. + /// This will read the directory and might performance cleanup when the database was not closed + /// properly. Cleanup only requires to read a few bytes from a few files and to delete + /// files, so it's fast. + pub fn open(path: PathBuf) -> Result { + Self::open_with_parallel_scheduler(path, Default::default()) + } + + /// Open a TurboPersistence database at the given path in read only mode. + /// This will read the directory. No Cleanup is performed. + pub fn open_read_only(path: PathBuf) -> Result { + Self::open_read_only_with_parallel_scheduler(path, Default::default()) + } +} + +impl TurboPersistence { + fn new(path: PathBuf, read_only: bool, parallel_scheduler: S) -> Self { Self { + parallel_scheduler, path, read_only, inner: RwLock::new(Inner { @@ -188,16 +205,19 @@ impl TurboPersistence { /// This will read the directory and might performance cleanup when the database was not closed /// properly. Cleanup only requires to read a few bytes from a few files and to delete /// files, so it's fast. - pub fn open(path: PathBuf) -> Result { - let mut db = Self::new(path, false); + pub fn open_with_parallel_scheduler(path: PathBuf, parallel_scheduler: S) -> Result { + let mut db = Self::new(path, false, parallel_scheduler); db.open_directory(false)?; Ok(db) } /// Open a TurboPersistence database at the given path in read only mode. /// This will read the directory. No Cleanup is performed. - pub fn open_read_only(path: PathBuf) -> Result { - let mut db = Self::new(path, true); + pub fn open_read_only_with_parallel_scheduler( + path: PathBuf, + parallel_scheduler: S, + ) -> Result { + let mut db = Self::new(path, true, parallel_scheduler); db.open_directory(false)?; Ok(db) } @@ -341,16 +361,12 @@ impl TurboPersistence { meta_files.retain(|seq| !deleted_files.contains(seq)); meta_files.sort_unstable(); - let span = Span::current(); - let mut meta_files = meta_files - .into_par_iter() - .with_min_len(1) - .map(|seq| { - let _span = span.enter(); + let mut meta_files = self + .parallel_scheduler + .parallel_map_collect::<_, _, Result>>(&meta_files, |&seq| { let meta_file = MetaFile::open(&self.path, seq)?; Ok(meta_file) - }) - .collect::>>()?; + })?; let mut sst_filter = SstFilter::new(); for meta_file in meta_files.iter_mut().rev() { @@ -398,7 +414,7 @@ impl TurboPersistence { /// This data will only become visible after the WriteBatch is committed. pub fn write_batch( &self, - ) -> Result> { + ) -> Result> { if self.read_only { bail!("Cannot write to a read-only database"); } @@ -413,7 +429,11 @@ impl TurboPersistence { ); } let current = self.inner.read().current_sequence_number; - Ok(WriteBatch::new(self.path.clone(), current)) + Ok(WriteBatch::new( + self.path.clone(), + current, + self.parallel_scheduler.clone(), + )) } fn open_log(&self) -> Result> { @@ -432,7 +452,7 @@ impl TurboPersistence { /// visible to readers. pub fn commit_write_batch( &self, - mut write_batch: WriteBatch, + mut write_batch: WriteBatch, ) -> Result<()> { if self.read_only { unreachable!("It's not possible to create a write batch for a read-only database"); @@ -475,15 +495,13 @@ impl TurboPersistence { new_meta_files.sort_unstable_by_key(|(seq, _)| *seq); - let mut new_meta_files = new_meta_files - .into_par_iter() - .with_min_len(1) - .map(|(seq, file)| { + let mut new_meta_files = self + .parallel_scheduler + .parallel_map_collect_owned::<_, _, Result>>(new_meta_files, |(seq, file)| { file.sync_all()?; let meta_file = MetaFile::open(&self.path, seq)?; Ok(meta_file) - }) - .collect::>>()?; + })?; let mut sst_filter = SstFilter::new(); for meta_file in new_meta_files.iter_mut().rev() { @@ -777,7 +795,6 @@ impl TurboPersistence { let path = &self.path; let log_mutex = Mutex::new(()); - let span = Span::current(); struct PartialResultPerFamily { new_meta_file: Option<(u32, File)>, @@ -789,336 +806,337 @@ impl TurboPersistence { let mut compact_config = compact_config.clone(); let merge_jobs = sst_by_family - .iter() - .map(|ssts_with_ranges| { + .into_iter() + .enumerate() + .filter_map(|(family, ssts_with_ranges)| { if compact_config.max_merge_segment_count == 0 { - return Vec::new(); + return None; } let (merge_jobs, real_merge_job_size) = - get_merge_segments(ssts_with_ranges, &compact_config); + get_merge_segments(&ssts_with_ranges, &compact_config); compact_config.max_merge_segment_count -= real_merge_job_size; - merge_jobs + Some((family, ssts_with_ranges, merge_jobs)) }) .collect::>(); - let result = sst_by_family - .into_par_iter() - .zip(merge_jobs.into_par_iter()) - .with_min_len(1) - .enumerate() - .map(|(family, (ssts_with_ranges, merge_jobs))| { - let family = family as u32; - let _span = span.clone().entered(); - - if merge_jobs.is_empty() { - return Ok(PartialResultPerFamily { - new_meta_file: None, - new_sst_files: Vec::new(), - sst_seq_numbers_to_delete: Vec::new(), - blob_seq_numbers_to_delete: Vec::new(), - keys_written: 0, - }); - } - - { - let metrics = compute_metrics(&ssts_with_ranges, 0..=u64::MAX); - let guard = log_mutex.lock(); - let mut log = self.open_log()?; - writeln!( - log, - "Compaction for family {family} (coverage: {}, overlap: {}, duplication: \ - {} / {} MiB):", - metrics.coverage, - metrics.overlap, - metrics.duplication, - metrics.duplicated_size / 1024 / 1024 - )?; - for job in merge_jobs.iter() { - writeln!(log, " merge")?; - for i in job.iter() { - let seq = ssts_with_ranges[*i].seq; - let (min, max) = ssts_with_ranges[*i].range().into_inner(); - writeln!(log, " {seq:08} {min:016x}-{max:016x}")?; - } + let result = self + .parallel_scheduler + .parallel_map_collect_owned::<_, _, Result>>( + merge_jobs, + |(family, ssts_with_ranges, merge_jobs)| { + let family = family as u32; + + if merge_jobs.is_empty() { + return Ok(PartialResultPerFamily { + new_meta_file: None, + new_sst_files: Vec::new(), + sst_seq_numbers_to_delete: Vec::new(), + blob_seq_numbers_to_delete: Vec::new(), + keys_written: 0, + }); } - drop(guard); - } - // Later we will remove the merged files - let sst_seq_numbers_to_delete = merge_jobs - .iter() - .filter(|l| l.len() > 1) - .flat_map(|l| l.iter().copied()) - .map(|index| ssts_with_ranges[index].seq) - .collect::>(); - - // Merge SST files - let span = tracing::trace_span!("merge files"); - enum PartialMergeResult<'l> { - Merged { - new_sst_files: Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>, - blob_seq_numbers_to_delete: Vec, - keys_written: u64, - }, - Move { - seq: u32, - meta: StaticSortedFileBuilderMeta<'l>, - }, - } - let merge_result = merge_jobs - .into_par_iter() - .with_min_len(1) - .map(|indices| { - let _span = span.clone().entered(); - if indices.len() == 1 { - // If we only have one file, we can just move it - let index = indices[0]; - let meta_index = ssts_with_ranges[index].meta_index; - let index_in_meta = ssts_with_ranges[index].index_in_meta; - let meta_file = &meta_files[meta_index]; - let entry = meta_file.entry(index_in_meta); - let amqf = Cow::Borrowed(entry.raw_amqf(meta_file.amqf_data())); - let meta = StaticSortedFileBuilderMeta { - min_hash: entry.min_hash(), - max_hash: entry.max_hash(), - amqf, - key_compression_dictionary_length: entry - .key_compression_dictionary_length(), - value_compression_dictionary_length: entry - .value_compression_dictionary_length(), - block_count: entry.block_count(), - size: entry.size(), - entries: 0, - }; - return Ok(PartialMergeResult::Move { - seq: entry.sequence_number(), - meta, - }); + { + let metrics = compute_metrics(&ssts_with_ranges, 0..=u64::MAX); + let guard = log_mutex.lock(); + let mut log = self.open_log()?; + writeln!( + log, + "Compaction for family {family} (coverage: {}, overlap: {}, \ + duplication: {} / {} MiB):", + metrics.coverage, + metrics.overlap, + metrics.duplication, + metrics.duplicated_size / 1024 / 1024 + )?; + for job in merge_jobs.iter() { + writeln!(log, " merge")?; + for i in job.iter() { + let seq = ssts_with_ranges[*i].seq; + let (min, max) = ssts_with_ranges[*i].range().into_inner(); + writeln!(log, " {seq:08} {min:016x}-{max:016x}")?; + } } + drop(guard); + } - fn create_sst_file( - entries: &[LookupEntry], - total_key_size: usize, - total_value_size: usize, - path: &Path, + // Later we will remove the merged files + let sst_seq_numbers_to_delete = merge_jobs + .iter() + .filter(|l| l.len() > 1) + .flat_map(|l| l.iter().copied()) + .map(|index| ssts_with_ranges[index].seq) + .collect::>(); + + // Merge SST files + let span = tracing::trace_span!("merge files"); + enum PartialMergeResult<'l> { + Merged { + new_sst_files: Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>, + blob_seq_numbers_to_delete: Vec, + keys_written: u64, + }, + Move { seq: u32, - ) -> Result<(u32, File, StaticSortedFileBuilderMeta<'static>)> - { - let _span = tracing::trace_span!("write merged sst file").entered(); - let (meta, file) = write_static_stored_file( - entries, - total_key_size, - total_value_size, - &path.join(format!("{seq:08}.sst")), - )?; - Ok((seq, file, meta)) - } - - let mut new_sst_files = Vec::new(); - - // Iterate all SST files - let iters = indices - .iter() - .map(|&index| { + meta: StaticSortedFileBuilderMeta<'l>, + }, + } + let merge_result = self + .parallel_scheduler + .parallel_map_collect_owned::<_, _, Result>>(merge_jobs, |indices| { + let _span = span.clone().entered(); + if indices.len() == 1 { + // If we only have one file, we can just move it + let index = indices[0]; let meta_index = ssts_with_ranges[index].meta_index; let index_in_meta = ssts_with_ranges[index].index_in_meta; - let meta = &meta_files[meta_index]; - meta.entry(index_in_meta) - .sst(meta)? - .iter(key_block_cache, value_block_cache) - }) - .collect::>>()?; - - let iter = MergeIter::new(iters.into_iter())?; - - // TODO figure out how to delete blobs when they are no longer - // referenced - let blob_seq_numbers_to_delete: Vec = Vec::new(); - - let mut keys_written = 0; - - let mut total_key_size = 0; - let mut total_value_size = 0; - let mut current: Option = None; - let mut entries = Vec::new(); - let mut last_entries = Vec::new(); - let mut last_entries_total_sizes = (0, 0); - for entry in iter { - let entry = entry?; - - // Remove duplicates - if let Some(current) = current.take() { - if current.key != entry.key { - let key_size = current.key.len(); - let value_size = current.value.size_in_sst(); - total_key_size += key_size; - total_value_size += value_size; - - if total_key_size + total_value_size - > DATA_THRESHOLD_PER_COMPACTED_FILE - || entries.len() >= MAX_ENTRIES_PER_COMPACTED_FILE - { - let (selected_total_key_size, selected_total_value_size) = - last_entries_total_sizes; - swap(&mut entries, &mut last_entries); - last_entries_total_sizes = ( - total_key_size - key_size, - total_value_size - value_size, - ); - total_key_size = key_size; - total_value_size = value_size; - - if !entries.is_empty() { - let seq = - sequence_number.fetch_add(1, Ordering::SeqCst) + 1; - - keys_written += entries.len() as u64; - new_sst_files.push(create_sst_file( - &entries, + let meta_file = &meta_files[meta_index]; + let entry = meta_file.entry(index_in_meta); + let amqf = Cow::Borrowed(entry.raw_amqf(meta_file.amqf_data())); + let meta = StaticSortedFileBuilderMeta { + min_hash: entry.min_hash(), + max_hash: entry.max_hash(), + amqf, + key_compression_dictionary_length: entry + .key_compression_dictionary_length(), + value_compression_dictionary_length: entry + .value_compression_dictionary_length(), + block_count: entry.block_count(), + size: entry.size(), + entries: 0, + }; + return Ok(PartialMergeResult::Move { + seq: entry.sequence_number(), + meta, + }); + } + + fn create_sst_file( + entries: &[LookupEntry], + total_key_size: usize, + total_value_size: usize, + path: &Path, + seq: u32, + ) -> Result<(u32, File, StaticSortedFileBuilderMeta<'static>)> + { + let _span = tracing::trace_span!("write merged sst file").entered(); + let (meta, file) = write_static_stored_file( + entries, + total_key_size, + total_value_size, + &path.join(format!("{seq:08}.sst")), + )?; + Ok((seq, file, meta)) + } + + let mut new_sst_files = Vec::new(); + + // Iterate all SST files + let iters = indices + .iter() + .map(|&index| { + let meta_index = ssts_with_ranges[index].meta_index; + let index_in_meta = ssts_with_ranges[index].index_in_meta; + let meta = &meta_files[meta_index]; + meta.entry(index_in_meta) + .sst(meta)? + .iter(key_block_cache, value_block_cache) + }) + .collect::>>()?; + + let iter = MergeIter::new(iters.into_iter())?; + + // TODO figure out how to delete blobs when they are no longer + // referenced + let blob_seq_numbers_to_delete: Vec = Vec::new(); + + let mut keys_written = 0; + + let mut total_key_size = 0; + let mut total_value_size = 0; + let mut current: Option = None; + let mut entries = Vec::new(); + let mut last_entries = Vec::new(); + let mut last_entries_total_sizes = (0, 0); + for entry in iter { + let entry = entry?; + + // Remove duplicates + if let Some(current) = current.take() { + if current.key != entry.key { + let key_size = current.key.len(); + let value_size = current.value.size_in_sst(); + total_key_size += key_size; + total_value_size += value_size; + + if total_key_size + total_value_size + > DATA_THRESHOLD_PER_COMPACTED_FILE + || entries.len() >= MAX_ENTRIES_PER_COMPACTED_FILE + { + let ( selected_total_key_size, selected_total_value_size, - path, - seq, - )?); - - entries.clear(); + ) = last_entries_total_sizes; + swap(&mut entries, &mut last_entries); + last_entries_total_sizes = ( + total_key_size - key_size, + total_value_size - value_size, + ); + total_key_size = key_size; + total_value_size = value_size; + + if !entries.is_empty() { + let seq = sequence_number + .fetch_add(1, Ordering::SeqCst) + + 1; + + keys_written += entries.len() as u64; + new_sst_files.push(create_sst_file( + &entries, + selected_total_key_size, + selected_total_value_size, + path, + seq, + )?); + + entries.clear(); + } } - } - entries.push(current); - } else { - // Override value + entries.push(current); + } else { + // Override value + } } + current = Some(entry); + } + if let Some(entry) = current { + total_key_size += entry.key.len(); + total_value_size += entry.value.size_in_sst(); + entries.push(entry); } - current = Some(entry); - } - if let Some(entry) = current { - total_key_size += entry.key.len(); - total_value_size += entry.value.size_in_sst(); - entries.push(entry); - } - // If we have one set of entries left, write them to a new SST file - if last_entries.is_empty() && !entries.is_empty() { - let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; - - keys_written += entries.len() as u64; - new_sst_files.push(create_sst_file( - &entries, - total_key_size, - total_value_size, - path, - seq, - )?); - } else - // If we have two sets of entries left, merge them and - // split it into two SST files, to avoid having a - // single SST file that is very small. - if !last_entries.is_empty() { - last_entries.append(&mut entries); - - last_entries_total_sizes.0 += total_key_size; - last_entries_total_sizes.1 += total_value_size; - - let (part1, part2) = last_entries.split_at(last_entries.len() / 2); - - let seq1 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; - let seq2 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; - - keys_written += part1.len() as u64; - new_sst_files.push(create_sst_file( - part1, - // We don't know the exact sizes so we estimate them - last_entries_total_sizes.0 / 2, - last_entries_total_sizes.1 / 2, - path, - seq1, - )?); - - keys_written += part2.len() as u64; - new_sst_files.push(create_sst_file( - part2, - last_entries_total_sizes.0 / 2, - last_entries_total_sizes.1 / 2, - path, - seq2, - )?); - } - Ok(PartialMergeResult::Merged { - new_sst_files, - blob_seq_numbers_to_delete, - keys_written, + // If we have one set of entries left, write them to a new SST file + if last_entries.is_empty() && !entries.is_empty() { + let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; + + keys_written += entries.len() as u64; + new_sst_files.push(create_sst_file( + &entries, + total_key_size, + total_value_size, + path, + seq, + )?); + } else + // If we have two sets of entries left, merge them and + // split it into two SST files, to avoid having a + // single SST file that is very small. + if !last_entries.is_empty() { + last_entries.append(&mut entries); + + last_entries_total_sizes.0 += total_key_size; + last_entries_total_sizes.1 += total_value_size; + + let (part1, part2) = last_entries.split_at(last_entries.len() / 2); + + let seq1 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; + let seq2 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; + + keys_written += part1.len() as u64; + new_sst_files.push(create_sst_file( + part1, + // We don't know the exact sizes so we estimate them + last_entries_total_sizes.0 / 2, + last_entries_total_sizes.1 / 2, + path, + seq1, + )?); + + keys_written += part2.len() as u64; + new_sst_files.push(create_sst_file( + part2, + last_entries_total_sizes.0 / 2, + last_entries_total_sizes.1 / 2, + path, + seq2, + )?); + } + Ok(PartialMergeResult::Merged { + new_sst_files, + blob_seq_numbers_to_delete, + keys_written, + }) }) - }) - .collect::>>() - .with_context(|| { - format!("Failed to merge database files for family {family}") - })?; - - let Some((sst_files_len, blob_delete_len)) = merge_result - .iter() - .map(|r| { - if let PartialMergeResult::Merged { - new_sst_files, - blob_seq_numbers_to_delete, - keys_written: _, - } = r - { - (new_sst_files.len(), blob_seq_numbers_to_delete.len()) - } else { - (0, 0) - } - }) - .reduce(|(a1, a2), (b1, b2)| (a1 + b1, a2 + b2)) - else { - unreachable!() - }; - - let mut new_sst_files = Vec::with_capacity(sst_files_len); - let mut blob_seq_numbers_to_delete = Vec::with_capacity(blob_delete_len); - - let mut meta_file_builder = MetaFileBuilder::new(family); - - let mut keys_written = 0; - for result in merge_result { - match result { - PartialMergeResult::Merged { - new_sst_files: merged_new_sst_files, - blob_seq_numbers_to_delete: merged_blob_seq_numbers_to_delete, - keys_written: merged_keys_written, - } => { - for (seq, file, meta) in merged_new_sst_files { + .with_context(|| { + format!("Failed to merge database files for family {family}") + })?; + + let Some((sst_files_len, blob_delete_len)) = merge_result + .iter() + .map(|r| { + if let PartialMergeResult::Merged { + new_sst_files, + blob_seq_numbers_to_delete, + keys_written: _, + } = r + { + (new_sst_files.len(), blob_seq_numbers_to_delete.len()) + } else { + (0, 0) + } + }) + .reduce(|(a1, a2), (b1, b2)| (a1 + b1, a2 + b2)) + else { + unreachable!() + }; + + let mut new_sst_files = Vec::with_capacity(sst_files_len); + let mut blob_seq_numbers_to_delete = Vec::with_capacity(blob_delete_len); + + let mut meta_file_builder = MetaFileBuilder::new(family); + + let mut keys_written = 0; + for result in merge_result { + match result { + PartialMergeResult::Merged { + new_sst_files: merged_new_sst_files, + blob_seq_numbers_to_delete: merged_blob_seq_numbers_to_delete, + keys_written: merged_keys_written, + } => { + for (seq, file, meta) in merged_new_sst_files { + meta_file_builder.add(seq, meta); + new_sst_files.push((seq, file)); + } + blob_seq_numbers_to_delete + .extend(merged_blob_seq_numbers_to_delete); + keys_written += merged_keys_written; + } + PartialMergeResult::Move { seq, meta } => { meta_file_builder.add(seq, meta); - new_sst_files.push((seq, file)); } - blob_seq_numbers_to_delete.extend(merged_blob_seq_numbers_to_delete); - keys_written += merged_keys_written; - } - PartialMergeResult::Move { seq, meta } => { - meta_file_builder.add(seq, meta); } } - } - for &seq in sst_seq_numbers_to_delete.iter() { - meta_file_builder.add_obsolete_sst_file(seq); - } + for &seq in sst_seq_numbers_to_delete.iter() { + meta_file_builder.add_obsolete_sst_file(seq); + } - let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; - let meta_file = { - let _span = tracing::trace_span!("write meta file").entered(); - meta_file_builder.write(&self.path, seq)? - }; - - Ok(PartialResultPerFamily { - new_meta_file: Some((seq, meta_file)), - new_sst_files, - sst_seq_numbers_to_delete, - blob_seq_numbers_to_delete, - keys_written, - }) - }) - .collect::>>()?; + let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; + let meta_file = { + let _span = tracing::trace_span!("write meta file").entered(); + meta_file_builder.write(&self.path, seq)? + }; + + Ok(PartialResultPerFamily { + new_meta_file: Some((seq, meta_file)), + new_sst_files, + sst_seq_numbers_to_delete, + blob_seq_numbers_to_delete, + keys_written, + }) + }, + )?; for PartialResultPerFamily { new_meta_file: inner_new_meta_file, diff --git a/turbopack/crates/turbo-persistence/src/lib.rs b/turbopack/crates/turbo-persistence/src/lib.rs index 70c87199f396c..f944e4b4d1202 100644 --- a/turbopack/crates/turbo-persistence/src/lib.rs +++ b/turbopack/crates/turbo-persistence/src/lib.rs @@ -13,19 +13,21 @@ mod db; mod key; mod lookup_entry; mod merge_iter; +mod meta_file; +mod meta_file_builder; +mod parallel_scheduler; +mod sst_filter; mod static_sorted_file; mod static_sorted_file_builder; +mod value_buf; mod write_batch; -mod meta_file; -mod meta_file_builder; -mod sst_filter; #[cfg(test)] mod tests; -mod value_buf; pub use arc_slice::ArcSlice; pub use db::{CompactConfig, MetaFileEntryInfo, MetaFileInfo, TurboPersistence}; pub use key::{KeyBase, QueryKey, StoreKey}; +pub use parallel_scheduler::{ParallelScheduler, SerialScheduler}; pub use value_buf::ValueBuffer; pub use write_batch::WriteBatch; diff --git a/turbopack/crates/turbo-persistence/src/parallel_scheduler.rs b/turbopack/crates/turbo-persistence/src/parallel_scheduler.rs new file mode 100644 index 0000000000000..52a9d626090fc --- /dev/null +++ b/turbopack/crates/turbo-persistence/src/parallel_scheduler.rs @@ -0,0 +1,137 @@ +pub trait ParallelScheduler: Clone + Sync + Send { + fn parallel_for_each(&self, items: &[T], f: impl Fn(&T) + Send + Sync) + where + T: Sync; + + fn try_parallel_for_each<'l, T, E>( + &self, + items: &'l [T], + f: impl (Fn(&'l T) -> Result<(), E>) + Send + Sync, + ) -> Result<(), E> + where + T: Sync, + E: Send + 'static; + + fn try_parallel_for_each_mut<'l, T, E>( + &self, + items: &'l mut [T], + f: impl (Fn(&'l mut T) -> Result<(), E>) + Send + Sync, + ) -> Result<(), E> + where + T: Send + Sync, + E: Send + 'static; + + fn try_parallel_for_each_owned( + &self, + items: Vec, + f: impl (Fn(T) -> Result<(), E>) + Send + Sync, + ) -> Result<(), E> + where + T: Send + Sync, + E: Send + 'static; + + fn parallel_map_collect<'l, Item, PerItemResult, Result>( + &self, + items: &'l [Item], + f: impl Fn(&'l Item) -> PerItemResult + Send + Sync, + ) -> Result + where + Item: Sync, + PerItemResult: Send + Sync + 'l, + Result: FromIterator; + + fn parallel_map_collect_owned( + &self, + items: Vec, + f: impl Fn(Item) -> PerItemResult + Send + Sync, + ) -> Result + where + Item: Send + Sync, + PerItemResult: Send + Sync, + Result: FromIterator; +} + +#[derive(Clone, Copy, Default)] +pub struct SerialScheduler; + +impl ParallelScheduler for SerialScheduler { + fn parallel_for_each(&self, items: &[T], f: impl Fn(&T) + Send + Sync) + where + T: Sync, + { + for item in items { + f(item); + } + } + + fn try_parallel_for_each<'l, T, E>( + &self, + items: &'l [T], + f: impl (Fn(&'l T) -> Result<(), E>) + Send + Sync, + ) -> Result<(), E> + where + T: Sync, + E: Send, + { + for item in items { + f(item)?; + } + Ok(()) + } + + fn try_parallel_for_each_mut<'l, T, E>( + &self, + items: &'l mut [T], + f: impl (Fn(&'l mut T) -> Result<(), E>) + Send + Sync, + ) -> Result<(), E> + where + T: Sync, + E: Send, + { + for item in items { + f(item)?; + } + Ok(()) + } + + fn try_parallel_for_each_owned( + &self, + items: Vec, + f: impl (Fn(T) -> Result<(), E>) + Send + Sync, + ) -> Result<(), E> + where + T: Sync, + E: Send, + { + for item in items { + f(item)?; + } + Ok(()) + } + + fn parallel_map_collect<'l, Item, PerItemResult, Result>( + &self, + items: &'l [Item], + f: impl Fn(&'l Item) -> PerItemResult + Send + Sync, + ) -> Result + where + Item: Sync, + PerItemResult: Send + Sync + 'l, + Result: FromIterator, + { + items.iter().map(f).collect() + } + + fn parallel_map_collect_owned( + &self, + items: Vec, + f: impl Fn(Item) -> PerItemResult + Send + Sync, + ) -> Result + where + Item: Send + Sync, + PerItemResult: Send + Sync, + Result: FromIterator, + { + items.into_iter().map(f).collect() + } +} diff --git a/turbopack/crates/turbo-persistence/src/tests.rs b/turbopack/crates/turbo-persistence/src/tests.rs index 5c123611d8759..6e0b42b92fe78 100644 --- a/turbopack/crates/turbo-persistence/src/tests.rs +++ b/turbopack/crates/turbo-persistence/src/tests.rs @@ -6,28 +6,116 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator}; use crate::{ constants::MAX_MEDIUM_VALUE_SIZE, db::{CompactConfig, TurboPersistence}, + parallel_scheduler::ParallelScheduler, write_batch::WriteBatch, }; +#[derive(Clone, Copy)] +struct RayonParallelScheduler; + +impl ParallelScheduler for RayonParallelScheduler { + fn parallel_for_each(&self, items: &[T], f: impl Fn(&T) + Send + Sync) + where + T: Sync, + { + items.into_par_iter().for_each(f); + } + + fn try_parallel_for_each<'l, T, E>( + &self, + items: &'l [T], + f: impl (Fn(&'l T) -> Result<(), E>) + Send + Sync, + ) -> Result<(), E> + where + T: Sync, + E: Send, + { + items.into_par_iter().try_for_each(f) + } + + fn try_parallel_for_each_mut<'l, T, E>( + &self, + items: &'l mut [T], + f: impl (Fn(&'l mut T) -> Result<(), E>) + Send + Sync, + ) -> Result<(), E> + where + T: Send + Sync, + E: Send, + { + items.into_par_iter().try_for_each(f) + } + + fn try_parallel_for_each_owned( + &self, + items: Vec, + f: impl (Fn(T) -> Result<(), E>) + Send + Sync, + ) -> Result<(), E> + where + T: Send + Sync, + E: Send, + { + items.into_par_iter().try_for_each(f) + } + + fn parallel_map_collect<'l, Item, PerItemResult, Result>( + &self, + items: &'l [Item], + f: impl Fn(&'l Item) -> PerItemResult + Send + Sync, + ) -> Result + where + Item: Sync, + PerItemResult: Send + Sync, + Result: FromIterator, + { + items + .into_par_iter() + .map(f) + .collect_vec_list() + .into_iter() + .flatten() + .collect() + } + + fn parallel_map_collect_owned( + &self, + items: Vec, + f: impl Fn(Item) -> PerItemResult + Send + Sync, + ) -> Result + where + Item: Send + Sync, + PerItemResult: Send + Sync, + Result: FromIterator, + { + items + .into_par_iter() + .map(f) + .collect_vec_list() + .into_iter() + .flatten() + .collect() + } +} + #[test] fn full_cycle() -> Result<()> { let mut test_cases = Vec::new(); type TestCases = Vec<( &'static str, - Box, 16>) -> Result<()>>, - Box Result<()>>, + Box, RayonParallelScheduler, 16>) -> Result<()>>, + Box) -> Result<()>>, )>; fn test_case( test_cases: &mut TestCases, name: &'static str, - write: impl Fn(&mut WriteBatch, 16>) -> Result<()> + 'static, - read: impl Fn(&TurboPersistence) -> Result<()> + 'static, + write: impl Fn(&mut WriteBatch, RayonParallelScheduler, 16>) -> Result<()> + 'static, + read: impl Fn(&TurboPersistence) -> Result<()> + 'static, ) { test_cases.push(( name, - Box::new(write) as Box, 16>) -> Result<()>>, - Box::new(read) as Box Result<()>>, + Box::new(write) + as Box, RayonParallelScheduler, 16>) -> Result<()>>, + Box::new(read) as Box) -> Result<()>>, )); } @@ -215,7 +303,10 @@ fn full_cycle() -> Result<()> { { let start = Instant::now(); - let db = TurboPersistence::open(path.to_path_buf())?; + let db = TurboPersistence::open_with_parallel_scheduler( + path.to_path_buf(), + RayonParallelScheduler, + )?; let mut batch = db.write_batch()?; write(&mut batch)?; db.commit_write_batch(batch)?; @@ -231,7 +322,10 @@ fn full_cycle() -> Result<()> { } { let start = Instant::now(); - let db = TurboPersistence::open(path.to_path_buf())?; + let db = TurboPersistence::open_with_parallel_scheduler( + path.to_path_buf(), + RayonParallelScheduler, + )?; println!("{name} restore time: {:?}", start.elapsed()); let start = Instant::now(); read(&db)?; @@ -257,7 +351,10 @@ fn full_cycle() -> Result<()> { } { let start = Instant::now(); - let db = TurboPersistence::open(path.to_path_buf())?; + let db = TurboPersistence::open_with_parallel_scheduler( + path.to_path_buf(), + RayonParallelScheduler, + )?; println!("{name} restore time after compact: {:?}", start.elapsed()); let start = Instant::now(); read(&db)?; @@ -291,7 +388,10 @@ fn full_cycle() -> Result<()> { { let start = Instant::now(); - let db = TurboPersistence::open(path.to_path_buf())?; + let db = TurboPersistence::open_with_parallel_scheduler( + path.to_path_buf(), + RayonParallelScheduler, + )?; let mut batch = db.write_batch()?; for (_, write, _) in test_cases.iter() { write(&mut batch)?; @@ -311,7 +411,10 @@ fn full_cycle() -> Result<()> { } { let start = Instant::now(); - let db = TurboPersistence::open(path.to_path_buf())?; + let db = TurboPersistence::open_with_parallel_scheduler( + path.to_path_buf(), + RayonParallelScheduler, + )?; println!("All restore time: {:?}", start.elapsed()); for (name, _, read) in test_cases.iter() { let start = Instant::now(); @@ -343,7 +446,10 @@ fn full_cycle() -> Result<()> { { let start = Instant::now(); - let db = TurboPersistence::open(path.to_path_buf())?; + let db = TurboPersistence::open_with_parallel_scheduler( + path.to_path_buf(), + RayonParallelScheduler, + )?; println!("All restore time after compact: {:?}", start.elapsed()); for (name, _, read) in test_cases.iter() { @@ -383,13 +489,17 @@ fn persist_changes() -> Result<()> { let path = tempdir.path(); const READ_COUNT: u32 = 2_000; // we'll read every 10th value, so writes are 10x this value - fn put(b: &WriteBatch<(u8, [u8; 4]), 1>, key: u8, value: u8) -> Result<()> { + fn put( + b: &WriteBatch<(u8, [u8; 4]), RayonParallelScheduler, 1>, + key: u8, + value: u8, + ) -> Result<()> { for i in 0..(READ_COUNT * 10) { b.put(0, (key, i.to_be_bytes()), vec![value].into())?; } Ok(()) } - fn check(db: &TurboPersistence, key: u8, value: u8) -> Result<()> { + fn check(db: &TurboPersistence, key: u8, value: u8) -> Result<()> { for i in 0..READ_COUNT { // read every 10th item let i = i * 10; @@ -402,7 +512,10 @@ fn persist_changes() -> Result<()> { } { - let db = TurboPersistence::open(path.to_path_buf())?; + let db = TurboPersistence::open_with_parallel_scheduler( + path.to_path_buf(), + RayonParallelScheduler, + )?; let b = db.write_batch::<_, 1>()?; put(&b, 1, 11)?; put(&b, 2, 21)?; @@ -418,7 +531,10 @@ fn persist_changes() -> Result<()> { println!("---"); { - let db = TurboPersistence::open(path.to_path_buf())?; + let db = TurboPersistence::open_with_parallel_scheduler( + path.to_path_buf(), + RayonParallelScheduler, + )?; let b = db.write_batch::<_, 1>()?; put(&b, 1, 12)?; put(&b, 2, 22)?; @@ -432,7 +548,10 @@ fn persist_changes() -> Result<()> { } { - let db = TurboPersistence::open(path.to_path_buf())?; + let db = TurboPersistence::open_with_parallel_scheduler( + path.to_path_buf(), + RayonParallelScheduler, + )?; let b = db.write_batch::<_, 1>()?; put(&b, 1, 13)?; db.commit_write_batch(b)?; @@ -446,7 +565,10 @@ fn persist_changes() -> Result<()> { println!("---"); { - let db = TurboPersistence::open(path.to_path_buf())?; + let db = TurboPersistence::open_with_parallel_scheduler( + path.to_path_buf(), + RayonParallelScheduler, + )?; check(&db, 1, 13)?; check(&db, 2, 22)?; @@ -457,7 +579,10 @@ fn persist_changes() -> Result<()> { println!("---"); { - let db = TurboPersistence::open(path.to_path_buf())?; + let db = TurboPersistence::open_with_parallel_scheduler( + path.to_path_buf(), + RayonParallelScheduler, + )?; db.compact(&CompactConfig { optimal_merge_count: 4, @@ -475,7 +600,10 @@ fn persist_changes() -> Result<()> { println!("---"); { - let db = TurboPersistence::open(path.to_path_buf())?; + let db = TurboPersistence::open_with_parallel_scheduler( + path.to_path_buf(), + RayonParallelScheduler, + )?; check(&db, 1, 13)?; check(&db, 2, 22)?; @@ -493,13 +621,17 @@ fn partial_compaction() -> Result<()> { let path = tempdir.path(); const READ_COUNT: u32 = 2_000; // we'll read every 10th value, so writes are 10x this value - fn put(b: &WriteBatch<(u8, [u8; 4]), 1>, key: u8, value: u8) -> Result<()> { + fn put( + b: &WriteBatch<(u8, [u8; 4]), RayonParallelScheduler, 1>, + key: u8, + value: u8, + ) -> Result<()> { for i in 0..(READ_COUNT * 10) { b.put(0, (key, i.to_be_bytes()), vec![value].into())?; } Ok(()) } - fn check(db: &TurboPersistence, key: u8, value: u8) -> Result<()> { + fn check(db: &TurboPersistence, key: u8, value: u8) -> Result<()> { for i in 0..READ_COUNT { // read every 10th item let i = i * 10; @@ -516,7 +648,10 @@ fn partial_compaction() -> Result<()> { println!("--- Iteration {i} ---"); println!("Add more entries"); { - let db = TurboPersistence::open(path.to_path_buf())?; + let db = TurboPersistence::open_with_parallel_scheduler( + path.to_path_buf(), + RayonParallelScheduler, + )?; let b = db.write_batch::<_, 1>()?; put(&b, i, i)?; put(&b, i + 1, i)?; @@ -535,7 +670,10 @@ fn partial_compaction() -> Result<()> { println!("Compaction"); { - let db = TurboPersistence::open(path.to_path_buf())?; + let db = TurboPersistence::open_with_parallel_scheduler( + path.to_path_buf(), + RayonParallelScheduler, + )?; db.compact(&CompactConfig { optimal_merge_count: 4, @@ -556,7 +694,10 @@ fn partial_compaction() -> Result<()> { println!("Restore check"); { - let db = TurboPersistence::open(path.to_path_buf())?; + let db = TurboPersistence::open_with_parallel_scheduler( + path.to_path_buf(), + RayonParallelScheduler, + )?; for j in 0..i { check(&db, j, j)?; @@ -580,7 +721,11 @@ fn merge_file_removal() -> Result<()> { let _ = fs::remove_dir_all(path); const READ_COUNT: u32 = 2_000; // we'll read every 10th value, so writes are 10x this value - fn put(b: &WriteBatch<(u8, [u8; 4]), 1>, key: u8, value: u32) -> Result<()> { + fn put( + b: &WriteBatch<(u8, [u8; 4]), RayonParallelScheduler, 1>, + key: u8, + value: u32, + ) -> Result<()> { for i in 0..(READ_COUNT * 10) { b.put( 0, @@ -590,7 +735,7 @@ fn merge_file_removal() -> Result<()> { } Ok(()) } - fn check(db: &TurboPersistence, key: u8, value: u32) -> Result<()> { + fn check(db: &TurboPersistence, key: u8, value: u32) -> Result<()> { for i in 0..READ_COUNT { // read every 10th item let i = i * 10; @@ -608,7 +753,10 @@ fn merge_file_removal() -> Result<()> { { println!("--- Init ---"); - let db = TurboPersistence::open(path.to_path_buf())?; + let db = TurboPersistence::open_with_parallel_scheduler( + path.to_path_buf(), + RayonParallelScheduler, + )?; let b = db.write_batch::<_, 1>()?; for j in 0..=255 { put(&b, j, 0)?; @@ -624,7 +772,10 @@ fn merge_file_removal() -> Result<()> { let i = i * 37; println!("Add more entries"); { - let db = TurboPersistence::open(path.to_path_buf())?; + let db = TurboPersistence::open_with_parallel_scheduler( + path.to_path_buf(), + RayonParallelScheduler, + )?; let b = db.write_batch::<_, 1>()?; for j in iter_bits(i) { println!("Put {j} = {i}"); @@ -642,7 +793,10 @@ fn merge_file_removal() -> Result<()> { println!("Compaction"); { - let db = TurboPersistence::open(path.to_path_buf())?; + let db = TurboPersistence::open_with_parallel_scheduler( + path.to_path_buf(), + RayonParallelScheduler, + )?; db.compact(&CompactConfig { optimal_merge_count: 4, @@ -660,7 +814,10 @@ fn merge_file_removal() -> Result<()> { println!("Restore check"); { - let db = TurboPersistence::open(path.to_path_buf())?; + let db = TurboPersistence::open_with_parallel_scheduler( + path.to_path_buf(), + RayonParallelScheduler, + )?; for j in 0..32 { check(&db, j, expected_values[j as usize])?; diff --git a/turbopack/crates/turbo-persistence/src/write_batch.rs b/turbopack/crates/turbo-persistence/src/write_batch.rs index 490cf38e88a90..81a954d0ef18e 100644 --- a/turbopack/crates/turbo-persistence/src/write_batch.rs +++ b/turbopack/crates/turbo-persistence/src/write_batch.rs @@ -9,15 +9,11 @@ use std::{ use anyhow::{Context, Result}; use byteorder::{BE, WriteBytesExt}; +use either::Either; use lzzzz::lz4::{self, ACC_LEVEL_DEFAULT}; use parking_lot::Mutex; -use rayon::{ - iter::{Either, IndexedParallelIterator, IntoParallelIterator, ParallelIterator}, - scope, -}; use smallvec::SmallVec; use thread_local::ThreadLocal; -use tracing::Span; use crate::{ ValueBuffer, @@ -26,6 +22,7 @@ use crate::{ constants::{MAX_MEDIUM_VALUE_SIZE, THREAD_LOCAL_SIZE_SHIFT}, key::StoreKey, meta_file_builder::MetaFileBuilder, + parallel_scheduler::ParallelScheduler, static_sorted_file_builder::{StaticSortedFileBuilderMeta, write_static_stored_file}, }; @@ -68,7 +65,9 @@ enum GlobalCollectorState { } /// A write batch. -pub struct WriteBatch { +pub struct WriteBatch { + /// Parallel scheduler + parallel_scheduler: S, /// The database path db_path: PathBuf, /// The current sequence number counter. Increased for every new SST file or blob file. @@ -84,13 +83,16 @@ pub struct WriteBatch { new_sst_files: Mutex>, } -impl WriteBatch { +impl + WriteBatch +{ /// Creates a new write batch for a database. - pub(crate) fn new(path: PathBuf, current: u32) -> Self { + pub(crate) fn new(path: PathBuf, current: u32, parallel_scheduler: S) -> Self { const { assert!(FAMILIES <= usize_from_u32(u32::MAX)); }; Self { + parallel_scheduler, db_path: path, current_sequence_number: AtomicU32::new(current), thread_locals: ThreadLocal::new(), @@ -223,13 +225,12 @@ impl WriteBatch { } } - let span = Span::current(); - collectors.into_par_iter().try_for_each(|mut collector| { - let _span = span.clone().entered(); - self.flush_thread_local_collector(family, &mut collector)?; - drop(collector); - anyhow::Ok(()) - })?; + self.parallel_scheduler + .try_parallel_for_each_owned(collectors, |mut collector| { + self.flush_thread_local_collector(family, &mut collector)?; + drop(collector); + anyhow::Ok(()) + })?; // Now we flush the global collector(s). let mut collector_state = self.collectors[usize_from_u32(family)].lock(); @@ -242,22 +243,22 @@ impl WriteBatch { } } GlobalCollectorState::Sharded(_) => { - let GlobalCollectorState::Sharded(shards) = replace( + let GlobalCollectorState::Sharded(mut shards) = replace( &mut *collector_state, GlobalCollectorState::Unsharded(Collector::new()), ) else { unreachable!(); }; - shards.into_par_iter().try_for_each(|mut collector| { - let _span = span.clone().entered(); - if !collector.is_empty() { - let sst = self.create_sst_file(family, collector.sorted())?; - collector.clear(); - self.new_sst_files.lock().push(sst); - drop(collector); - } - anyhow::Ok(()) - })?; + self.parallel_scheduler + .try_parallel_for_each_mut(&mut shards, |collector| { + if !collector.is_empty() { + let sst = self.create_sst_file(family, collector.sorted())?; + collector.clear(); + self.new_sst_files.lock().push(sst); + collector.drop_contents(); + } + anyhow::Ok(()) + })?; } } @@ -269,10 +270,9 @@ impl WriteBatch { #[tracing::instrument(level = "trace", skip(self))] pub(crate) fn finish(&mut self) -> Result { let mut new_blob_files = Vec::new(); - let shared_error = Mutex::new(Ok(())); // First, we flush all thread local collectors to the global collectors. - scope(|scope| { + { let _span = tracing::trace_span!("flush thread local collectors").entered(); let mut collectors = [const { Vec::new() }; FAMILIES]; for cell in self.thread_locals.iter_mut() { @@ -286,23 +286,24 @@ impl WriteBatch { } } } - for (family, thread_local_collectors) in collectors.into_iter().enumerate() { - for mut collector in thread_local_collectors { - let this = &self; - let shared_error = &shared_error; - let span = Span::current(); - scope.spawn(move |_| { - let _span = span.entered(); - if let Err(err) = - this.flush_thread_local_collector(family as u32, &mut collector) - { - *shared_error.lock() = Err(err); - } - drop(collector); - }); - } - } - }); + let to_flush = collectors + .into_iter() + .enumerate() + .flat_map(|(family, collector)| { + collector + .into_iter() + .map(move |collector| (family as u32, collector)) + }) + .collect::>(); + self.parallel_scheduler.try_parallel_for_each_owned( + to_flush, + |(family, mut collector)| { + self.flush_thread_local_collector(family, &mut collector)?; + drop(collector); + anyhow::Ok(()) + }, + )?; + } let _span = tracing::trace_span!("flush collectors").entered(); @@ -313,25 +314,24 @@ impl WriteBatch { let new_collectors = [(); FAMILIES].map(|_| Mutex::new(GlobalCollectorState::Unsharded(Collector::new()))); let collectors = replace(&mut self.collectors, new_collectors); - let span = Span::current(); - collectors - .into_par_iter() + let collectors = collectors + .into_iter() .enumerate() .flat_map(|(family, state)| { let collector = state.into_inner(); match collector { GlobalCollectorState::Unsharded(collector) => { - Either::Left([(family, collector)].into_par_iter()) + Either::Left([(family, collector)].into_iter()) + } + GlobalCollectorState::Sharded(shards) => { + Either::Right(shards.into_iter().map(move |collector| (family, collector))) } - GlobalCollectorState::Sharded(shards) => Either::Right( - shards - .into_par_iter() - .map(move |collector| (family, collector)), - ), } }) - .try_for_each(|(family, mut collector)| { - let _span = span.clone().entered(); + .collect::>(); + self.parallel_scheduler.try_parallel_for_each_owned( + collectors, + |(family, mut collector)| { let family = family as u32; if !collector.is_empty() { let sst = self.create_sst_file(family, collector.sorted())?; @@ -340,33 +340,37 @@ impl WriteBatch { shared_new_sst_files.lock().push(sst); } anyhow::Ok(()) - })?; - - shared_error.into_inner()?; + }, + )?; // Not we need to write the new meta files. let new_meta_collectors = [(); FAMILIES].map(|_| Mutex::new(Vec::new())); let meta_collectors = replace(&mut self.meta_collectors, new_meta_collectors); let keys_written = AtomicU64::new(0); - let new_meta_files = meta_collectors - .into_par_iter() + let file_to_write = meta_collectors + .into_iter() .map(|mutex| mutex.into_inner()) .enumerate() .filter(|(_, sst_files)| !sst_files.is_empty()) - .map(|(family, sst_files)| { - let family = family as u32; - let mut entries = 0; - let mut builder = MetaFileBuilder::new(family); - for (seq, sst) in sst_files { - entries += sst.entries; - builder.add(seq, sst); - } - keys_written.fetch_add(entries, Ordering::Relaxed); - let seq = self.current_sequence_number.fetch_add(1, Ordering::SeqCst) + 1; - let file = builder.write(&self.db_path, seq)?; - Ok((seq, file)) - }) - .collect::>>()?; + .collect::>(); + let new_meta_files = self + .parallel_scheduler + .parallel_map_collect_owned::<_, _, Result>>( + file_to_write, + |(family, sst_files)| { + let family = family as u32; + let mut entries = 0; + let mut builder = MetaFileBuilder::new(family); + for (seq, sst) in sst_files { + entries += sst.entries; + builder.add(seq, sst); + } + keys_written.fetch_add(entries, Ordering::Relaxed); + let seq = self.current_sequence_number.fetch_add(1, Ordering::SeqCst) + 1; + let file = builder.write(&self.db_path, seq)?; + Ok((seq, file)) + }, + )?; // Finally we return the new files and sequence number. let seq = self.current_sequence_number.load(Ordering::SeqCst); diff --git a/turbopack/crates/turbo-tasks-backend/Cargo.toml b/turbopack/crates/turbo-tasks-backend/Cargo.toml index eb3ee57b72093..3554ce1f31b20 100644 --- a/turbopack/crates/turbo-tasks-backend/Cargo.toml +++ b/turbopack/crates/turbo-tasks-backend/Cargo.toml @@ -40,7 +40,6 @@ once_cell = { workspace = true } parking_lot = { workspace = true } pot = "3.0.0" rand = { workspace = true } -rayon = { workspace = true } ringmap = { workspace = true, features = ["serde"] } rustc-hash = { workspace = true } serde = { workspace = true } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index 0bf9249ecb622..fbba6b1ab13d4 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -1267,7 +1267,6 @@ impl TurboTasksBackendInner { return task_id; } - self.track_cache_miss(&task_type); let tx = self .should_restore() .then(|| self.backing_storage.start_read_transaction()) @@ -1279,6 +1278,7 @@ impl TurboTasksBackendInner { .forward_lookup_task_cache(tx.as_ref(), &task_type) .expect("Failed to lookup task id") } { + self.track_cache_hit(&task_type); let _ = self.task_cache.try_insert(Arc::new(task_type), task_id); task_id } else { @@ -1287,12 +1287,14 @@ impl TurboTasksBackendInner { let task_id = if let Err(existing_task_id) = self.task_cache.try_insert(task_type.clone(), task_id) { + self.track_cache_hit(&task_type); // Safety: We just created the id and failed to insert it. unsafe { self.persisted_task_id_factory.reuse(task_id); } existing_task_id } else { + self.track_cache_miss(&task_type); task_id }; if let Some(log) = &self.persisted_task_cache_log { @@ -1327,10 +1329,10 @@ impl TurboTasksBackendInner { return task_id; } - self.track_cache_miss(&task_type); let task_type = Arc::new(task_type); let task_id = self.transient_task_id_factory.get(); - if let Err(existing_task_id) = self.task_cache.try_insert(task_type, task_id) { + if let Err(existing_task_id) = self.task_cache.try_insert(task_type.clone(), task_id) { + self.track_cache_hit(&task_type); // Safety: We just created the id and failed to insert it. unsafe { self.transient_task_id_factory.reuse(task_id); @@ -1339,6 +1341,7 @@ impl TurboTasksBackendInner { return existing_task_id; } + self.track_cache_miss(&task_type); self.connect_child(parent_task, task_id, turbo_tasks); task_id diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs index ceab626298854..ba83f1da5a209 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs @@ -6,9 +6,8 @@ use std::{ }; use bitfield::bitfield; -use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator}; use smallvec::SmallVec; -use turbo_tasks::{FxDashMap, TaskId}; +use turbo_tasks::{FxDashMap, TaskId, parallel}; use crate::{ backend::dynamic_storage::DynamicStorage, @@ -664,48 +663,43 @@ impl Storage { // The number of shards is much larger than the number of threads, so the effect of the // locks held is negligible. - self.modified - .shards() - .par_iter() - .with_max_len(1) - .map(|shard| { - let mut direct_snapshots: Vec<(TaskId, Box)> = Vec::new(); - let mut modified: SmallVec<[TaskId; 4]> = SmallVec::new(); - { - // Take the snapshots from the modified map - let guard = shard.write(); - // Safety: guard must outlive the iterator. - for bucket in unsafe { guard.iter() } { - // Safety: the guard guarantees that the bucket is not removed and the ptr - // is valid. - let (key, shared_value) = unsafe { bucket.as_mut() }; - let modified_state = shared_value.get_mut(); - match modified_state { - ModifiedState::Modified => { - modified.push(*key); - } - ModifiedState::Snapshot(snapshot) => { - if let Some(snapshot) = snapshot.take() { - direct_snapshots.push((*key, snapshot)); - } + parallel::map_collect::<_, _, Vec<_>>(self.modified.shards(), |shard| { + let mut direct_snapshots: Vec<(TaskId, Box)> = Vec::new(); + let mut modified: SmallVec<[TaskId; 4]> = SmallVec::new(); + { + // Take the snapshots from the modified map + let guard = shard.write(); + // Safety: guard must outlive the iterator. + for bucket in unsafe { guard.iter() } { + // Safety: the guard guarantees that the bucket is not removed and the ptr + // is valid. + let (key, shared_value) = unsafe { bucket.as_mut() }; + let modified_state = shared_value.get_mut(); + match modified_state { + ModifiedState::Modified => { + modified.push(*key); + } + ModifiedState::Snapshot(snapshot) => { + if let Some(snapshot) = snapshot.take() { + direct_snapshots.push((*key, snapshot)); } } } - // Safety: guard must outlive the iterator. - drop(guard); } + // Safety: guard must outlive the iterator. + drop(guard); + } - SnapshotShard { - direct_snapshots, - modified, - storage: self, - guard: Some(guard.clone()), - process, - preprocess, - process_snapshot, - } - }) - .collect::>() + SnapshotShard { + direct_snapshots, + modified, + storage: self, + guard: Some(guard.clone()), + process, + preprocess, + process_snapshot, + } + }) } /// Start snapshot mode. diff --git a/turbopack/crates/turbo-tasks-backend/src/database/turbo.rs b/turbopack/crates/turbo-tasks-backend/src/database/turbo/mod.rs similarity index 89% rename from turbopack/crates/turbo-tasks-backend/src/database/turbo.rs rename to turbopack/crates/turbo-tasks-backend/src/database/turbo/mod.rs index be6c5ee684a83..f2934b3c857ad 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/turbo.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/turbo/mod.rs @@ -1,21 +1,21 @@ -use std::{ - cmp::max, - path::PathBuf, - sync::Arc, - thread::{JoinHandle, available_parallelism, spawn}, -}; +use std::{cmp::max, path::PathBuf, sync::Arc, thread::available_parallelism}; use anyhow::{Ok, Result}; use parking_lot::Mutex; +use tokio::{runtime::Handle, task::block_in_place}; use turbo_persistence::{ ArcSlice, CompactConfig, KeyBase, StoreKey, TurboPersistence, ValueBuffer, }; +use turbo_tasks::{JoinHandle, spawn}; use crate::database::{ key_value_database::{KeySpace, KeyValueDatabase}, + turbo::parallel_scheduler::TurboTasksParallelScheduler, write_batch::{BaseWriteBatch, ConcurrentWriteBatch, WriteBatch, WriteBuffer}, }; +mod parallel_scheduler; + const MB: u64 = 1024 * 1024; const COMPACT_CONFIG: CompactConfig = CompactConfig { min_merge_count: 3, @@ -28,7 +28,7 @@ const COMPACT_CONFIG: CompactConfig = CompactConfig { }; pub struct TurboKeyValueDatabase { - db: Arc, + db: Arc>, compact_join_handle: Mutex>>>, is_ci: bool, is_short_session: bool, @@ -84,7 +84,7 @@ impl KeyValueDatabase for TurboKeyValueDatabase { ) -> Result, Self::ConcurrentWriteBatch<'_>>> { // Wait for the compaction to finish if let Some(join_handle) = self.compact_join_handle.lock().take() { - join_handle.join().unwrap()?; + join(join_handle)?; } // Start a new write batch Ok(WriteBatch::concurrent(TurboWriteBatch { @@ -100,7 +100,7 @@ impl KeyValueDatabase for TurboKeyValueDatabase { fn shutdown(&self) -> Result<()> { // Wait for the compaction to finish if let Some(join_handle) = self.compact_join_handle.lock().take() { - join_handle.join().unwrap()?; + join(join_handle)?; } // Compact the database on shutdown self.db.compact(&CompactConfig { @@ -118,8 +118,8 @@ impl KeyValueDatabase for TurboKeyValueDatabase { } pub struct TurboWriteBatch<'a> { - batch: turbo_persistence::WriteBatch, 5>, - db: &'a Arc, + batch: turbo_persistence::WriteBatch, TurboTasksParallelScheduler, 5>, + db: &'a Arc>, compact_join_handle: Option<&'a Mutex>>>>, } @@ -144,7 +144,7 @@ impl<'a> BaseWriteBatch<'a> for TurboWriteBatch<'a> { if let Some(compact_join_handle) = self.compact_join_handle { // Start a new compaction in the background let db = self.db.clone(); - let handle = spawn(move || { + let handle = spawn(async move { db.compact(&CompactConfig { max_merge_segment_count: available_parallelism() .map_or(4, |c| max(4, c.get() / 2)), @@ -220,3 +220,7 @@ impl<'l> From> for ValueBuffer<'l> { } } } + +fn join(handle: JoinHandle>) -> Result<()> { + block_in_place(|| Handle::current().block_on(handle)) +} diff --git a/turbopack/crates/turbo-tasks-backend/src/database/turbo/parallel_scheduler.rs b/turbopack/crates/turbo-tasks-backend/src/database/turbo/parallel_scheduler.rs new file mode 100644 index 0000000000000..dd78022c3640b --- /dev/null +++ b/turbopack/crates/turbo-tasks-backend/src/database/turbo/parallel_scheduler.rs @@ -0,0 +1,76 @@ +use turbo_persistence::ParallelScheduler; +use turbo_tasks::parallel; + +#[derive(Clone, Copy, Default)] +pub struct TurboTasksParallelScheduler; + +impl ParallelScheduler for TurboTasksParallelScheduler { + fn parallel_for_each(&self, items: &[T], f: impl Fn(&T) + Send + Sync) + where + T: Sync, + { + parallel::for_each(items, f); + } + + fn try_parallel_for_each<'l, T, E>( + &self, + items: &'l [T], + f: impl (Fn(&'l T) -> Result<(), E>) + Send + Sync, + ) -> Result<(), E> + where + T: Sync, + E: Send + 'static, + { + parallel::try_for_each(items, f) + } + + fn try_parallel_for_each_mut<'l, T, E>( + &self, + items: &'l mut [T], + f: impl (Fn(&'l mut T) -> Result<(), E>) + Send + Sync, + ) -> Result<(), E> + where + T: Send + Sync, + E: Send + 'static, + { + parallel::try_for_each_mut(items, f) + } + + fn try_parallel_for_each_owned( + &self, + items: Vec, + f: impl (Fn(T) -> Result<(), E>) + Send + Sync, + ) -> Result<(), E> + where + T: Send + Sync, + E: Send + 'static, + { + parallel::try_for_each_owned(items, f) + } + + fn parallel_map_collect<'l, T, I, R>( + &self, + items: &'l [T], + f: impl Fn(&'l T) -> I + Send + Sync, + ) -> R + where + T: Sync, + I: Send + Sync + 'l, + R: FromIterator, + { + parallel::map_collect(items, f) + } + + fn parallel_map_collect_owned( + &self, + items: Vec, + f: impl Fn(T) -> I + Send + Sync, + ) -> R + where + T: Send + Sync, + I: Send + Sync, + R: FromIterator, + { + parallel::map_collect_owned(items, f) + } +} diff --git a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs index c4b84310d651f..3127a15ab7066 100644 --- a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs @@ -1,21 +1,18 @@ use std::{ borrow::Borrow, - cmp::max, env, path::PathBuf, sync::{Arc, LazyLock, Mutex, PoisonError, Weak}, }; use anyhow::{Context, Result, anyhow}; -use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; use serde::{Deserialize, Serialize}; use smallvec::SmallVec; -use tracing::Span; use turbo_tasks::{ SessionId, TaskId, backend::CachedTaskType, panic_hooks::{PanicHookGuard, register_panic_hook}, - turbo_tasks_scope, + parallel, }; use crate::{ @@ -331,14 +328,15 @@ impl BackingStorageSealed let _span = tracing::trace_span!("update task data").entered(); process_task_data(snapshots, Some(batch))?; let span = tracing::trace_span!("flush task data").entered(); - [KeySpace::TaskMeta, KeySpace::TaskData] - .into_par_iter() - .try_for_each(|key_space| { + parallel::try_for_each( + &[KeySpace::TaskMeta, KeySpace::TaskData], + |&key_space| { let _span = span.clone().entered(); // Safety: We already finished all processing of the task data and task // meta unsafe { batch.flush(key_space) } - })?; + }, + )?; } let mut next_task_id = get_next_free_task_id::< @@ -352,10 +350,9 @@ impl BackingStorageSealed items = task_cache_updates.iter().map(|m| m.len()).sum::() ) .entered(); - let result = task_cache_updates - .into_par_iter() - .with_max_len(1) - .map(|updates| { + let result = parallel::map_collect_owned::<_, _, Result>>( + task_cache_updates, + |updates| { let _span = _span.clone().entered(); let mut max_task_id = 0; @@ -390,15 +387,11 @@ impl BackingStorageSealed } Ok(max_task_id) - }) - .reduce( - || Ok(0), - |a, b| -> anyhow::Result<_> { - let a_max = a?; - let b_max = b?; - Ok(max(a_max, b_max)) - }, - )?; + }, + )? + .into_iter() + .max() + .unwrap_or(0); next_task_id = next_task_id.max(result); } @@ -410,64 +403,11 @@ impl BackingStorageSealed )?; } WriteBatch::Serial(batch) => { - let mut task_items_result = Ok(Vec::new()); - turbo_tasks::scope(|s| { - s.spawn(|_| { - task_items_result = - process_task_data(snapshots, None::<&T::ConcurrentWriteBatch<'_>>); - }); - - let mut next_task_id = - get_next_free_task_id::< - T::SerialWriteBatch<'_>, - T::ConcurrentWriteBatch<'_>, - >(&mut WriteBatchRef::serial(batch))?; - - { - let _span = tracing::trace_span!( - "update task cache", - items = task_cache_updates.iter().map(|m| m.len()).sum::() - ) - .entered(); - let mut task_type_bytes = Vec::new(); - for (task_type, task_id) in task_cache_updates.into_iter().flatten() { - let task_id = *task_id; - serialize_task_type(&task_type, &mut task_type_bytes, task_id)?; - - batch - .put( - KeySpace::ForwardTaskCache, - WriteBuffer::Borrowed(&task_type_bytes), - WriteBuffer::Borrowed(&task_id.to_le_bytes()), - ) - .with_context(|| { - anyhow!("Unable to write task cache {task_type:?} => {task_id}") - })?; - batch - .put( - KeySpace::ReverseTaskCache, - WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()), - WriteBuffer::Borrowed(&task_type_bytes), - ) - .with_context(|| { - anyhow!("Unable to write task cache {task_id} => {task_type:?}") - })?; - next_task_id = next_task_id.max(task_id + 1); - } - } - - save_infra::, T::ConcurrentWriteBatch<'_>>( - &mut WriteBatchRef::serial(batch), - next_task_id, - session_id, - operations, - )?; - anyhow::Ok(()) - })?; - { let _span = tracing::trace_span!("update tasks").entered(); - for (task_id, meta, data) in task_items_result?.into_iter().flatten() { + let task_items = + process_task_data(snapshots, None::<&T::ConcurrentWriteBatch<'_>>)?; + for (task_id, meta, data) in task_items.into_iter().flatten() { let key = IntKey::new(*task_id); let key = key.as_ref(); if let Some(meta) = meta { @@ -485,7 +425,54 @@ impl BackingStorageSealed })?; } } + batch.flush(KeySpace::TaskMeta)?; + batch.flush(KeySpace::TaskData)?; + } + + let mut next_task_id = get_next_free_task_id::< + T::SerialWriteBatch<'_>, + T::ConcurrentWriteBatch<'_>, + >(&mut WriteBatchRef::serial(batch))?; + + { + let _span = tracing::trace_span!( + "update task cache", + items = task_cache_updates.iter().map(|m| m.len()).sum::() + ) + .entered(); + let mut task_type_bytes = Vec::new(); + for (task_type, task_id) in task_cache_updates.into_iter().flatten() { + let task_id = *task_id; + serialize_task_type(&task_type, &mut task_type_bytes, task_id)?; + + batch + .put( + KeySpace::ForwardTaskCache, + WriteBuffer::Borrowed(&task_type_bytes), + WriteBuffer::Borrowed(&task_id.to_le_bytes()), + ) + .with_context(|| { + anyhow!("Unable to write task cache {task_type:?} => {task_id}") + })?; + batch + .put( + KeySpace::ReverseTaskCache, + WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()), + WriteBuffer::Borrowed(&task_type_bytes), + ) + .with_context(|| { + anyhow!("Unable to write task cache {task_id} => {task_type:?}") + })?; + next_task_id = next_task_id.max(task_id + 1); + } } + + save_infra::, T::ConcurrentWriteBatch<'_>>( + &mut WriteBatchRef::serial(batch), + next_task_id, + session_id, + operations, + )?; } } @@ -703,48 +690,38 @@ where > + Send + Sync, { - let span = Span::current(); - let turbo_tasks = turbo_tasks::turbo_tasks(); - let handle = tokio::runtime::Handle::current(); - tasks - .into_par_iter() - .map(|tasks| { - let _span = span.clone().entered(); - let _guard = handle.clone().enter(); - turbo_tasks_scope(turbo_tasks.clone(), || { - let mut result = Vec::new(); - for (task_id, meta, data) in tasks { - if let Some(batch) = batch { - let key = IntKey::new(*task_id); - let key = key.as_ref(); - if let Some(meta) = meta { - batch.put( - KeySpace::TaskMeta, - WriteBuffer::Borrowed(key), - WriteBuffer::SmallVec(meta), - )?; - } - if let Some(data) = data { - batch.put( - KeySpace::TaskData, - WriteBuffer::Borrowed(key), - WriteBuffer::SmallVec(data), - )?; - } - } else { - // Store the new task data - result.push(( - task_id, - meta.map(WriteBuffer::SmallVec), - data.map(WriteBuffer::SmallVec), - )); - } + parallel::map_collect_owned::<_, _, Result>>(tasks, |tasks| { + let mut result = Vec::new(); + for (task_id, meta, data) in tasks { + if let Some(batch) = batch { + let key = IntKey::new(*task_id); + let key = key.as_ref(); + if let Some(meta) = meta { + batch.put( + KeySpace::TaskMeta, + WriteBuffer::Borrowed(key), + WriteBuffer::SmallVec(meta), + )?; + } + if let Some(data) = data { + batch.put( + KeySpace::TaskData, + WriteBuffer::Borrowed(key), + WriteBuffer::SmallVec(data), + )?; } + } else { + // Store the new task data + result.push(( + task_id, + meta.map(WriteBuffer::SmallVec), + data.map(WriteBuffer::SmallVec), + )); + } + } - Ok(result) - }) - }) - .collect::>>() + Ok(result) + }) } fn serialize(task: TaskId, data: &Vec) -> Result> { diff --git a/turbopack/crates/turbo-tasks-backend/tests/all_in_one.rs b/turbopack/crates/turbo-tasks-backend/tests/all_in_one.rs index f9321cfd797fb..add31c32ecd35 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/all_in_one.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/all_in_one.rs @@ -9,7 +9,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn all_in_one() { run(®ISTRATION, || async { let a: Vc = Vc::cell(4242); diff --git a/turbopack/crates/turbo-tasks-backend/tests/basic.rs b/turbopack/crates/turbo-tasks-backend/tests/basic.rs index a12da0b8578d8..a22cb96ade456 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/basic.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/basic.rs @@ -8,7 +8,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn basic() { run(®ISTRATION, || async { let output1 = func_without_args(); diff --git a/turbopack/crates/turbo-tasks-backend/tests/bug.rs b/turbopack/crates/turbo-tasks-backend/tests/bug.rs index f7e8097a1b7aa..5d225bdb8c48e 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/bug.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/bug.rs @@ -24,7 +24,7 @@ struct TaskSpec { #[turbo_tasks::value(transparent)] struct TasksSpec(Vec); -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn graph_bug() { // see https://github.com/vercel/next.js/pull/79451 run(®ISTRATION, || async { diff --git a/turbopack/crates/turbo-tasks-backend/tests/bug2.rs b/turbopack/crates/turbo-tasks-backend/tests/bug2.rs index df3115b8aa3da..a1495eeeca91b 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/bug2.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/bug2.rs @@ -33,7 +33,7 @@ pub struct TaskSpec { #[turbo_tasks::value(transparent)] struct Iteration(State); -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn graph_bug() { run(®ISTRATION, move || async move { let spec = vec![ diff --git a/turbopack/crates/turbo-tasks-backend/tests/call_types.rs b/turbopack/crates/turbo-tasks-backend/tests/call_types.rs index 17875d2630d78..f06430ada2bd0 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/call_types.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/call_types.rs @@ -8,7 +8,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn functions() { run(®ISTRATION, || async { assert_eq!(*fn_plain().await?, 42); @@ -53,7 +53,7 @@ async fn async_fn_vc_arg(n: Vc) -> Result> { Ok(Vc::cell(*n.await?)) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn methods() { run(®ISTRATION, || async { assert_eq!(*Value::static_method().await?, 42); @@ -106,7 +106,7 @@ impl Value { } } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn trait_methods() { run(®ISTRATION, || async { assert_eq!(*Value::static_trait_method().await?, 42); diff --git a/turbopack/crates/turbo-tasks-backend/tests/collectibles.rs b/turbopack/crates/turbo-tasks-backend/tests/collectibles.rs index a86c0e09343d0..945845a86e3a2 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/collectibles.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/collectibles.rs @@ -14,7 +14,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn transitive_emitting() { run(®ISTRATION, || async { let result_op = my_transitive_emitting_function(rcstr!(""), rcstr!("")); @@ -32,7 +32,7 @@ async fn transitive_emitting() { .unwrap() } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn transitive_emitting_indirect() { run(®ISTRATION, || async { let result_op = my_transitive_emitting_function(rcstr!(""), rcstr!("")); @@ -50,7 +50,7 @@ async fn transitive_emitting_indirect() { .unwrap() } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn multi_emitting() { run(®ISTRATION, || async { let result_op = my_multi_emitting_function(); @@ -68,7 +68,7 @@ async fn multi_emitting() { .unwrap() } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn taking_collectibles() { run(®ISTRATION, || async { let result_op = my_collecting_function(); @@ -84,7 +84,7 @@ async fn taking_collectibles() { .unwrap() } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn taking_collectibles_extra_layer() { run(®ISTRATION, || async { let result_op = my_collecting_function_indirect(); @@ -100,7 +100,7 @@ async fn taking_collectibles_extra_layer() { .unwrap() } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn taking_collectibles_parallel() { run(®ISTRATION, || async { let result_op = my_transitive_emitting_function(rcstr!(""), rcstr!("a")); @@ -142,7 +142,7 @@ async fn taking_collectibles_parallel() { .unwrap() } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn taking_collectibles_with_resolve() { run(®ISTRATION, || async { let result_op = my_transitive_emitting_function_with_resolve(rcstr!("resolve")); diff --git a/turbopack/crates/turbo-tasks-backend/tests/debug.rs b/turbopack/crates/turbo-tasks-backend/tests/debug.rs index 854d57b234395..ccc833eeb85d8 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/debug.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/debug.rs @@ -9,7 +9,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn primitive_debug() { run(®ISTRATION, || async { let a: Vc = Vc::cell(42); @@ -20,7 +20,7 @@ async fn primitive_debug() { .unwrap() } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn transparent_debug() { run(®ISTRATION, || async { let a: Vc = Transparent(42).cell(); @@ -32,7 +32,7 @@ async fn transparent_debug() { .unwrap() } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn enum_none_debug() { run(®ISTRATION, || async { let a: Vc = Enum::None.cell(); @@ -44,7 +44,7 @@ async fn enum_none_debug() { .unwrap() } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn enum_transparent_debug() { run(®ISTRATION, || async { let a: Vc = Enum::Transparent(Transparent(42).resolved_cell()).cell(); @@ -60,7 +60,7 @@ async fn enum_transparent_debug() { .unwrap() } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn enum_inner_vc_debug() { run(®ISTRATION, || async { let a: Vc = Enum::Enum(Enum::None.resolved_cell()).cell(); @@ -76,7 +76,7 @@ async fn enum_inner_vc_debug() { .unwrap() } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn struct_unit_debug() { run(®ISTRATION, || async { let a: Vc = StructUnit.cell(); @@ -87,7 +87,7 @@ async fn struct_unit_debug() { .unwrap() } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn struct_transparent_debug() { run(®ISTRATION, || async { let a: Vc = StructWithTransparent { @@ -106,7 +106,7 @@ async fn struct_transparent_debug() { .unwrap() } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn struct_vec_debug() { run(®ISTRATION, || async { let a: Vc = StructWithVec { vec: vec![] }.cell(); @@ -135,7 +135,7 @@ async fn struct_vec_debug() { .unwrap() } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn struct_ignore_debug() { run(®ISTRATION, || async { let a: Vc = StructWithIgnore { diff --git a/turbopack/crates/turbo-tasks-backend/tests/detached.rs b/turbopack/crates/turbo-tasks-backend/tests/detached.rs index c76c23590f8ab..b1c80929fad6a 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/detached.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/detached.rs @@ -15,7 +15,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_spawns_detached() -> anyhow::Result<()> { run(®ISTRATION, || async { // HACK: The watch channel we use has an incorrect implementation of `TraceRawVcs`, just @@ -82,7 +82,7 @@ async fn spawns_detached( Vc::cell(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_spawns_detached_changing() -> anyhow::Result<()> { run(®ISTRATION, || async { // HACK: The watch channel we use has an incorrect implementation of `TraceRawVcs` diff --git a/turbopack/crates/turbo-tasks-backend/tests/dirty_in_progress.rs b/turbopack/crates/turbo-tasks-backend/tests/dirty_in_progress.rs index 8171cead7dd40..89aa8998fae80 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/dirty_in_progress.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/dirty_in_progress.rs @@ -11,7 +11,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn dirty_in_progress() { run(®ISTRATION, || async { let cases = [ diff --git a/turbopack/crates/turbo-tasks-backend/tests/emptied_cells.rs b/turbopack/crates/turbo-tasks-backend/tests/emptied_cells.rs index 4a3ddce3bfa73..87c2d6672e468 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/emptied_cells.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/emptied_cells.rs @@ -8,7 +8,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn recompute() { run(®ISTRATION, || async { let input = ChangingInput { diff --git a/turbopack/crates/turbo-tasks-backend/tests/filter_unused_args.rs b/turbopack/crates/turbo-tasks-backend/tests/filter_unused_args.rs index b7081174940c6..3193382110215 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/filter_unused_args.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/filter_unused_args.rs @@ -8,7 +8,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn filtered_trait_method_args() -> Result<()> { run(®ISTRATION, || async { let uses_arg = UsesArg.cell(); diff --git a/turbopack/crates/turbo-tasks-backend/tests/immutable.rs b/turbopack/crates/turbo-tasks-backend/tests/immutable.rs index d90a4cb2f78de..0c716c7544744 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/immutable.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/immutable.rs @@ -8,7 +8,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn hidden_mutate() { run(®ISTRATION, || async { let input = create_input().resolve().await?; diff --git a/turbopack/crates/turbo-tasks-backend/tests/local_tasks.rs b/turbopack/crates/turbo-tasks-backend/tests/local_tasks.rs index e2a6a7abdfa74..f66363d374635 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/local_tasks.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/local_tasks.rs @@ -8,7 +8,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_local_task_id() -> Result<()> { run(®ISTRATION, || async { let local_vc = get_local_task_id(); diff --git a/turbopack/crates/turbo-tasks-backend/tests/operation_vc.rs b/turbopack/crates/turbo-tasks-backend/tests/operation_vc.rs index 8000ddc8b26e3..457971d0667c7 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/operation_vc.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/operation_vc.rs @@ -26,7 +26,7 @@ fn use_operations() -> Vc { forty_two.connect() } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_use_operations() -> Result<()> { run(®ISTRATION, || async { assert_eq!(*use_operations().await?, 42); diff --git a/turbopack/crates/turbo-tasks-backend/tests/panics.rs b/turbopack/crates/turbo-tasks-backend/tests/panics.rs index d321e825f1430..8b9458ab4f046 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/panics.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/panics.rs @@ -25,7 +25,7 @@ static FILE_PATH_REGEX: LazyLock = // // This test depends on the process-wide global panic handler. This test must be run in its own // process in isolation of any other tests. -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_panic_hook() { let prev_hook = take_hook(); set_hook(Box::new(move |info| { diff --git a/turbopack/crates/turbo-tasks-backend/tests/performance.rs b/turbopack/crates/turbo-tasks-backend/tests/performance.rs index 904843fad2a63..13b76582af633 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/performance.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/performance.rs @@ -142,7 +142,7 @@ fn check_skip() -> bool { false } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn many_calls_to_many_children() { if check_skip() { return; @@ -157,7 +157,7 @@ async fn many_calls_to_many_children() { .unwrap(); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn many_calls_to_uncached_many_children() { if check_skip() { return; @@ -189,7 +189,7 @@ fn run_big_graph_test(counts: Vec) -> impl Future> + Se ) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn many_calls_to_big_graph_1() { if check_skip() { return; @@ -199,7 +199,7 @@ async fn many_calls_to_big_graph_1() { .unwrap(); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn many_calls_to_big_graph_2() { if check_skip() { return; @@ -211,7 +211,7 @@ async fn many_calls_to_big_graph_2() { .unwrap(); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn many_calls_to_big_graph_3() { if check_skip() { return; @@ -221,7 +221,7 @@ async fn many_calls_to_big_graph_3() { .unwrap(); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn many_calls_to_big_graph_4() { if check_skip() { return; @@ -231,7 +231,7 @@ async fn many_calls_to_big_graph_4() { .unwrap(); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn many_calls_to_big_graph_5() { if check_skip() { return; @@ -243,7 +243,7 @@ async fn many_calls_to_big_graph_5() { .unwrap(); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn many_calls_to_big_graph_6() { if check_skip() { return; @@ -255,7 +255,7 @@ async fn many_calls_to_big_graph_6() { .unwrap(); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn many_calls_to_big_graph_7() { if check_skip() { return; @@ -270,7 +270,7 @@ async fn many_calls_to_big_graph_7() { .unwrap(); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn many_calls_to_big_graph_8() { if check_skip() { return; @@ -282,7 +282,7 @@ async fn many_calls_to_big_graph_8() { .unwrap(); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn many_calls_to_big_graph_9() { if check_skip() { return; diff --git a/turbopack/crates/turbo-tasks-backend/tests/random_change.rs b/turbopack/crates/turbo-tasks-backend/tests/random_change.rs index 841c4564af444..089490ab1c79c 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/random_change.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/random_change.rs @@ -9,7 +9,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn random_change() { run(®ISTRATION, || async { let state = make_state(); diff --git a/turbopack/crates/turbo-tasks-backend/tests/read_ref_cell.rs b/turbopack/crates/turbo-tasks-backend/tests/read_ref_cell.rs index d7ccf3b37b6cf..66c51c9e4f1ad 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/read_ref_cell.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/read_ref_cell.rs @@ -10,7 +10,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn read_ref() { run(®ISTRATION, || async { let counter = Counter::cell(Counter { diff --git a/turbopack/crates/turbo-tasks-backend/tests/recompute.rs b/turbopack/crates/turbo-tasks-backend/tests/recompute.rs index 17a69e9c151d3..dcad783b06e08 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/recompute.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/recompute.rs @@ -8,7 +8,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn recompute() { run(®ISTRATION, || async { let input = ChangingInput { @@ -58,7 +58,7 @@ async fn recompute() { .unwrap() } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn immutable_analysis() { run(®ISTRATION, || async { let input = ChangingInput { diff --git a/turbopack/crates/turbo-tasks-backend/tests/recompute_collectibles.rs b/turbopack/crates/turbo-tasks-backend/tests/recompute_collectibles.rs index 54074af628add..d7c0be301ac70 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/recompute_collectibles.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/recompute_collectibles.rs @@ -9,7 +9,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn recompute() { run(®ISTRATION, || async { let input = ChangingInput::new(1).resolve().await?; diff --git a/turbopack/crates/turbo-tasks-backend/tests/resolved_vc.rs b/turbopack/crates/turbo-tasks-backend/tests/resolved_vc.rs index da3a69ca62dce..a0b9914b7f8bb 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/resolved_vc.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/resolved_vc.rs @@ -23,7 +23,7 @@ fn assert_resolved(input: ResolvedVc) { assert!(input_vc.is_resolved()); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_conversion() -> Result<()> { run(®ISTRATION, || async { let unresolved: Vc = Vc::cell(42); @@ -38,7 +38,7 @@ async fn test_conversion() -> Result<()> { .await } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_cell_construction() -> Result<()> { run(®ISTRATION, || async { let a: ResolvedVc = ResolvedVc::cell(42); @@ -50,7 +50,7 @@ async fn test_cell_construction() -> Result<()> { .await } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_resolved_vc_as_arg() -> Result<()> { run(®ISTRATION, || async { let unresolved: Vc = returns_int(42); @@ -62,7 +62,7 @@ async fn test_resolved_vc_as_arg() -> Result<()> { .await } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_into_future() -> Result<()> { run(®ISTRATION, || async { let mut resolved = ResolvedVc::cell(42); @@ -78,7 +78,7 @@ async fn test_into_future() -> Result<()> { .await } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_sidecast() -> Result<()> { run(®ISTRATION, || async { let concrete_value = ImplementsAAndB.resolved_cell(); diff --git a/turbopack/crates/turbo-tasks-backend/tests/shrink_to_fit.rs b/turbopack/crates/turbo-tasks-backend/tests/shrink_to_fit.rs index 524a78950acf2..dc82e82174de5 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/shrink_to_fit.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/shrink_to_fit.rs @@ -11,7 +11,7 @@ static REGISTRATION: Registration = register!(); #[turbo_tasks::value(transparent)] struct Wrapper(Vec); -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_shrink_to_fit() -> Result<()> { run(®ISTRATION, || async { // `Vec::shrink_to_fit` is implicitly called when a cell is constructed. diff --git a/turbopack/crates/turbo-tasks-backend/tests/task_statistics.rs b/turbopack/crates/turbo-tasks-backend/tests/task_statistics.rs index 8a391ace095aa..869c944bcb5c7 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/task_statistics.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/task_statistics.rs @@ -13,7 +13,7 @@ use turbo_tasks_testing::{Registration, register, run_without_cache_check}; static REGISTRATION: Registration = register!(); -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_simple_task() -> Result<()> { run_without_cache_check(®ISTRATION, async move { enable_stats(); @@ -39,7 +39,7 @@ async fn test_simple_task() -> Result<()> { .await } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_await_same_vc_multiple_times() -> Result<()> { run_without_cache_check(®ISTRATION, async move { enable_stats(); @@ -61,7 +61,7 @@ async fn test_await_same_vc_multiple_times() -> Result<()> { .await } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_vc_receiving_task() -> Result<()> { run_without_cache_check(®ISTRATION, async move { enable_stats(); @@ -93,7 +93,7 @@ async fn test_vc_receiving_task() -> Result<()> { .await } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_trait_methods() -> Result<()> { run_without_cache_check(®ISTRATION, async move { enable_stats(); @@ -130,7 +130,7 @@ async fn test_trait_methods() -> Result<()> { .await } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_dyn_trait_methods() -> Result<()> { run_without_cache_check(®ISTRATION, async move { enable_stats(); @@ -174,7 +174,7 @@ async fn test_dyn_trait_methods() -> Result<()> { } // creates Vcs, but doesn't ever execute them -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_no_execution() -> Result<()> { run_without_cache_check(®ISTRATION, async move { enable_stats(); diff --git a/turbopack/crates/turbo-tasks-backend/tests/trace_transient.rs b/turbopack/crates/turbo-tasks-backend/tests/trace_transient.rs index 74c21fcaebb65..f553a83a52c5b 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/trace_transient.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/trace_transient.rs @@ -18,7 +18,7 @@ Adder::add_method (read cell of type turbo-tasks@TODO::::primitives::u64) unknown transient task (read cell of type turbo-tasks@TODO::::primitives::u16) unknown transient task (read cell of type turbo-tasks@TODO::::primitives::u32)"; -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_trace_transient() { let result = run_without_cache_check(®ISTRATION, async { read_incorrect_task_input_operation(IncorrectTaskInput( diff --git a/turbopack/crates/turbo-tasks-backend/tests/trait_ref_cell.rs b/turbopack/crates/turbo-tasks-backend/tests/trait_ref_cell.rs index c556e8d422489..2372947303360 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/trait_ref_cell.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/trait_ref_cell.rs @@ -10,7 +10,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn trait_ref() { run(®ISTRATION, || async { let counter = Counter::cell(Counter { diff --git a/turbopack/crates/turbo-tasks-backend/tests/trait_ref_cell_mode.rs b/turbopack/crates/turbo-tasks-backend/tests/trait_ref_cell_mode.rs index 15917f62563bf..3b8d1cb15c02a 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/trait_ref_cell_mode.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/trait_ref_cell_mode.rs @@ -9,7 +9,7 @@ static REGISTRATION: Registration = register!(); // Test that with `cell = "shared"`, the cell will be re-used as long as the // value is equal. -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_trait_ref_shared_cell_mode() { run(®ISTRATION, || async { let input = CellIdSelector { @@ -44,7 +44,7 @@ async fn test_trait_ref_shared_cell_mode() { // Test that with `cell = "new"`, the cell will is never re-used, even if the // value is equal. -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_trait_ref_new_cell_mode() { run(®ISTRATION, || async { let input = CellIdSelector { diff --git a/turbopack/crates/turbo-tasks-backend/tests/transient_collectible.rs b/turbopack/crates/turbo-tasks-backend/tests/transient_collectible.rs index 216e8a285dbf8..b144319ed4763 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/transient_collectible.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/transient_collectible.rs @@ -10,7 +10,7 @@ static REGISTRATION: Registration = register!(); const EXPECTED_MSG: &str = "Collectible is transient, transient collectibles cannot be emitted from persistent tasks"; -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_transient_emit_from_persistent() { let result = run_without_cache_check(®ISTRATION, async { emit_incorrect_task_input_operation(IncorrectTaskInput(U32Wrapper(123).resolved_cell())) diff --git a/turbopack/crates/turbo-tasks-backend/tests/transient_vc.rs b/turbopack/crates/turbo-tasks-backend/tests/transient_vc.rs index 7db072310c915..100008c755c5c 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/transient_vc.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/transient_vc.rs @@ -7,7 +7,7 @@ use turbo_tasks_testing::{Registration, register, run_without_cache_check}; static REGISTRATION: Registration = register!(); -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_transient_vc() -> Result<()> { run_without_cache_check(®ISTRATION, async { test_transient_operation(TransientValue::new(123)) diff --git a/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs b/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs index a325eefa0f445..b44f2a3a00522 100644 --- a/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs +++ b/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs @@ -18,7 +18,7 @@ static REGISTRATION: Registration = register!(turbo_tasks_fetch::register); /// acquire and hold this lock to prevent potential flakiness. static GLOBAL_TEST_LOCK: TokioMutex<()> = TokioMutex::const_new(()); -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn basic_get() { let _guard = GLOBAL_TEST_LOCK.lock().await; run(®ISTRATION, || async { @@ -49,7 +49,7 @@ async fn basic_get() { .unwrap() } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn sends_user_agent() { let _guard = GLOBAL_TEST_LOCK.lock().await; run(®ISTRATION, || async { @@ -85,7 +85,7 @@ async fn sends_user_agent() { // This is temporary behavior. // TODO: Implement invalidation that respects Cache-Control headers. -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn invalidation_does_not_invalidate() { let _guard = GLOBAL_TEST_LOCK.lock().await; run(®ISTRATION, || async { @@ -130,7 +130,7 @@ fn get_issue_context() -> Vc { DiskFileSystem::new(rcstr!("root"), rcstr!("/")).root() } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn errors_on_failed_connection() { let _guard = GLOBAL_TEST_LOCK.lock().await; run(®ISTRATION, || async { @@ -161,7 +161,7 @@ async fn errors_on_failed_connection() { .unwrap() } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn errors_on_404() { let _guard = GLOBAL_TEST_LOCK.lock().await; run(®ISTRATION, || async { @@ -196,7 +196,7 @@ async fn errors_on_404() { .unwrap() } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn client_cache() { // a simple fetch that should always succeed async fn simple_fetch(path: &str, client: FetchClient) -> anyhow::Result<()> { diff --git a/turbopack/crates/turbo-tasks-fs/src/lib.rs b/turbopack/crates/turbo-tasks-fs/src/lib.rs index 9a17774a49c37..2ae2dc3036f0c 100644 --- a/turbopack/crates/turbo-tasks-fs/src/lib.rs +++ b/turbopack/crates/turbo-tasks-fs/src/lib.rs @@ -46,7 +46,6 @@ use dunce::simplified; use indexmap::IndexSet; use jsonc_parser::{ParseOptions, parse_to_serde_value}; use mime::Mime; -use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rustc_hash::FxHashSet; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -56,7 +55,7 @@ use turbo_rcstr::{RcStr, rcstr}; use turbo_tasks::{ ApplyEffectsContext, Completion, InvalidationReason, Invalidator, NonLocalValue, ReadRef, ResolvedVc, TaskInput, ValueToString, Vc, debug::ValueDebugFormat, effect, - mark_session_dependent, mark_stateful, trace::TraceRawVcs, + mark_session_dependent, mark_stateful, parallel, trace::TraceRawVcs, }; use turbo_tasks_hash::{DeterministicHash, DeterministicHasher, hash_xxh3_hash64}; use turbo_unix_path::{ @@ -309,19 +308,14 @@ impl DiskFileSystemInner { fn invalidate(&self) { let _span = tracing::info_span!("invalidate filesystem", name = &*self.root).entered(); - let span = tracing::Span::current(); - let handle = tokio::runtime::Handle::current(); let invalidator_map = take(&mut *self.invalidator_map.lock().unwrap()); let dir_invalidator_map = take(&mut *self.dir_invalidator_map.lock().unwrap()); - let iter = invalidator_map - .into_par_iter() - .chain(dir_invalidator_map.into_par_iter()) - .flat_map(|(_, invalidators)| invalidators.into_par_iter()); - iter.for_each(|(i, _)| { - let _span = span.clone().entered(); - let _guard = handle.enter(); - i.invalidate() - }); + let invalidators = invalidator_map + .into_iter() + .chain(dir_invalidator_map) + .flat_map(|(_, invalidators)| invalidators.into_keys()) + .collect::>(); + parallel::for_each_owned(invalidators, |invalidator| invalidator.invalidate()); } /// Invalidates every tracked file in the filesystem. @@ -332,23 +326,19 @@ impl DiskFileSystemInner { reason: impl Fn(&Path) -> R + Sync, ) { let _span = tracing::info_span!("invalidate filesystem", name = &*self.root).entered(); - let span = tracing::Span::current(); - let handle = tokio::runtime::Handle::current(); let invalidator_map = take(&mut *self.invalidator_map.lock().unwrap()); let dir_invalidator_map = take(&mut *self.dir_invalidator_map.lock().unwrap()); - let iter = invalidator_map - .into_par_iter() - .chain(dir_invalidator_map.into_par_iter()) + let invalidators = invalidator_map + .into_iter() + .chain(dir_invalidator_map) .flat_map(|(path, invalidators)| { - let _span = span.clone().entered(); let reason_for_path = reason(&path); invalidators - .into_par_iter() + .into_keys() .map(move |i| (reason_for_path.clone(), i)) - }); - iter.for_each(|(reason, (invalidator, _))| { - let _span = span.clone().entered(); - let _guard = handle.enter(); + }) + .collect::>(); + parallel::for_each_owned(invalidators, |(reason, invalidator)| { invalidator.invalidate_with_reason(reason) }); } diff --git a/turbopack/crates/turbo-tasks-fs/src/watcher.rs b/turbopack/crates/turbo-tasks-fs/src/watcher.rs index e39a7a022d700..a9f58e0f2d6ca 100644 --- a/turbopack/crates/turbo-tasks-fs/src/watcher.rs +++ b/turbopack/crates/turbo-tasks-fs/src/watcher.rs @@ -16,13 +16,12 @@ use notify::{ Config, EventKind, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher, event::{MetadataKind, ModifyKind, RenameMode}, }; -use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rustc_hash::FxHashSet; use serde::{Deserialize, Serialize}; use tracing::instrument; use turbo_rcstr::RcStr; use turbo_tasks::{ - FxIndexSet, InvalidationReason, InvalidationReasonKind, Invalidator, spawn_thread, + FxIndexSet, InvalidationReason, InvalidationReasonKind, Invalidator, parallel, spawn_thread, util::StaticOrArc, }; @@ -397,40 +396,30 @@ impl DiskWatcher { // // Best is to start_watching before starting to read { - let span = tracing::info_span!("invalidate filesystem"); - let _span = span.clone().entered(); + let _span = tracing::info_span!("invalidate filesystem").entered(); let invalidator_map = take(&mut *fs_inner.invalidator_map.lock().unwrap()); let dir_invalidator_map = take(&mut *fs_inner.dir_invalidator_map.lock().unwrap()); - let iter = invalidator_map - .into_par_iter() - .chain(dir_invalidator_map.into_par_iter()); - let handle = tokio::runtime::Handle::current(); + let iter = invalidator_map.into_iter().chain(dir_invalidator_map); if report_invalidation_reason { - iter.flat_map(|(path, invalidators)| { - let _span = span.clone().entered(); - let reason = WatchStart { - name: fs_inner.name.clone(), - // this path is just used for display purposes - path: RcStr::from(path.to_string_lossy()), - }; - invalidators - .into_par_iter() - .map(move |i| (reason.clone(), i)) - }) - .for_each(|(reason, (invalidator, _))| { - let _span = span.clone().entered(); - let _guard = handle.enter(); - invalidator.invalidate_with_reason(reason) + let invalidators = iter + .flat_map(|(path, invalidators)| { + let reason = WatchStart { + name: fs_inner.name.clone(), + // this path is just used for display purposes + path: RcStr::from(path.to_string_lossy()), + }; + invalidators.into_iter().map(move |i| (reason.clone(), i)) + }) + .collect::>(); + parallel::for_each_owned(invalidators, |(reason, (invalidator, _))| { + invalidator.invalidate_with_reason(reason); }); } else { - iter.flat_map(|(_, invalidators)| { - let _span = span.clone().entered(); - invalidators.into_par_iter().map(move |i| i) - }) - .for_each(|(invalidator, _)| { - let _span = span.clone().entered(); - let _guard = handle.enter(); - invalidator.invalidate() + let invalidators = iter + .flat_map(|(_, invalidators)| invalidators.into_keys()) + .collect::>(); + parallel::for_each_owned(invalidators, |invalidator| { + invalidator.invalidate(); }); } } diff --git a/turbopack/crates/turbo-tasks/src/lib.rs b/turbopack/crates/turbo-tasks/src/lib.rs index fc68a0cea6e45..841e4b1f8c455 100644 --- a/turbopack/crates/turbo-tasks/src/lib.rs +++ b/turbopack/crates/turbo-tasks/src/lib.rs @@ -37,6 +37,8 @@ #![feature(never_type)] #![feature(downcast_unchecked)] #![feature(ptr_metadata)] +#![feature(sync_unsafe_cell)] +#![feature(vec_into_raw_parts)] pub mod backend; mod capture_future; @@ -64,13 +66,14 @@ mod no_move_vec; mod once_map; mod output; pub mod panic_hooks; +pub mod parallel; pub mod persisted_graph; pub mod primitives; mod raw_vc; mod read_options; mod read_ref; pub mod registry; -mod scope; +pub mod scope; mod serialization_invalidation; pub mod small_duration; mod spawn; @@ -115,7 +118,6 @@ pub use raw_vc::{CellId, RawVc, ReadRawVcFuture, ResolveTypeError}; pub use read_options::ReadCellOptions; pub use read_ref::ReadRef; use rustc_hash::FxHasher; -pub use scope::scope; pub use serialization_invalidation::SerializationInvalidator; pub use shrink_to_fit::ShrinkToFit; pub use spawn::{JoinHandle, spawn, spawn_blocking, spawn_thread}; diff --git a/turbopack/crates/turbo-tasks/src/manager.rs b/turbopack/crates/turbo-tasks/src/manager.rs index b7f2cfc96b4c6..393c5224da390 100644 --- a/turbopack/crates/turbo-tasks/src/manager.rs +++ b/turbopack/crates/turbo-tasks/src/manager.rs @@ -1058,27 +1058,30 @@ impl TurboTasks { } pub async fn stop_and_wait(&self) { - self.backend.stopping(self); - self.stopped.store(true, Ordering::Release); - { - let listener = self - .event - .listen_with_note(|| || "wait for stop".to_string()); - if self.currently_scheduled_tasks.load(Ordering::Acquire) != 0 { - listener.await; + turbo_tasks_future_scope(self.pin(), async move { + self.backend.stopping(self); + self.stopped.store(true, Ordering::Release); + { + let listener = self + .event + .listen_with_note(|| || "wait for stop".to_string()); + if self.currently_scheduled_tasks.load(Ordering::Acquire) != 0 { + listener.await; + } } - } - { - let listener = self.event_background.listen(); - if self - .currently_scheduled_background_jobs - .load(Ordering::Acquire) - != 0 { - listener.await; + let listener = self.event_background.listen(); + if self + .currently_scheduled_background_jobs + .load(Ordering::Acquire) + != 0 + { + listener.await; + } } - } - self.backend.stop(self); + self.backend.stop(self); + }) + .await; } #[track_caller] @@ -1675,6 +1678,10 @@ pub fn turbo_tasks() -> Arc { TURBO_TASKS.with(|arc| arc.clone()) } +pub fn try_turbo_tasks() -> Option> { + TURBO_TASKS.try_with(|arc| arc.clone()).ok() +} + pub fn with_turbo_tasks(func: impl FnOnce(&Arc) -> T) -> T { TURBO_TASKS.with(|arc| func(arc)) } diff --git a/turbopack/crates/turbo-tasks/src/parallel.rs b/turbopack/crates/turbo-tasks/src/parallel.rs new file mode 100644 index 0000000000000..e20b67bf67892 --- /dev/null +++ b/turbopack/crates/turbo-tasks/src/parallel.rs @@ -0,0 +1,308 @@ +//! Parallel for each and map using tokio tasks. +//! +//! This avoid the problem of sleeping threads with mimalloc when using rayon in combination with +//! tokio. It also avoid having multiple thread pools. +//! see also https://pwy.io/posts/mimalloc-cigarette/ + +use std::{sync::LazyLock, thread::available_parallelism}; + +use crate::{scope::scope_and_block, util::into_chunks}; + +/// Calculates a good chunk size for parallel processing based on the number of available threads. +/// This is used to ensure that the workload is evenly distributed across the threads. +fn good_chunk_size(len: usize) -> usize { + static GOOD_CHUNK_COUNT: LazyLock = + LazyLock::new(|| available_parallelism().map_or(16, |c| c.get() * 4)); + let min_chunk_count = *GOOD_CHUNK_COUNT; + len.div_ceil(min_chunk_count) +} + +pub fn for_each<'l, T, F>(items: &'l [T], f: F) +where + T: Sync, + F: Fn(&'l T) + Send + Sync, +{ + let len = items.len(); + if len <= 1 { + for item in items { + f(item); + } + return; + } + let chunk_size = good_chunk_size(len); + let f = &f; + let _results = scope_and_block(len.div_ceil(chunk_size), |scope| { + for chunk in items.chunks(chunk_size) { + scope.spawn(async move { + for item in chunk { + f(item); + } + }) + } + }); +} + +pub fn for_each_owned(items: Vec, f: impl Fn(T) + Send + Sync) +where + T: Send + Sync, +{ + let len = items.len(); + if len <= 1 { + for item in items { + f(item); + } + return; + } + let chunk_size = good_chunk_size(len); + let f = &f; + let _results = scope_and_block(len.div_ceil(chunk_size), |scope| { + for chunk in into_chunks(items, chunk_size) { + scope.spawn(async move { + // SAFETY: Even when f() panics we drop all items in the chunk. + for item in chunk { + f(item); + } + }) + } + }); +} + +pub fn try_for_each<'l, T, E>( + items: &'l [T], + f: impl (Fn(&'l T) -> Result<(), E>) + Send + Sync, +) -> Result<(), E> +where + T: Sync, + E: Send + 'static, +{ + let len = items.len(); + if len <= 1 { + for item in items { + f(item)?; + } + return Ok(()); + } + let chunk_size = good_chunk_size(len); + let f = &f; + scope_and_block(len.div_ceil(chunk_size), |scope| { + for chunk in items.chunks(chunk_size) { + scope.spawn(async move { + for item in chunk { + f(item)?; + } + Ok(()) + }) + } + }) + .collect::>() +} + +pub fn try_for_each_mut<'l, T, E>( + items: &'l mut [T], + f: impl (Fn(&'l mut T) -> Result<(), E>) + Send + Sync, +) -> Result<(), E> +where + T: Send + Sync, + E: Send + 'static, +{ + let len = items.len(); + if len <= 1 { + for item in items { + f(item)?; + } + return Ok(()); + } + let chunk_size = good_chunk_size(len); + let f = &f; + scope_and_block(len.div_ceil(chunk_size), |scope| { + for chunk in items.chunks_mut(chunk_size) { + scope.spawn(async move { + for item in chunk { + f(item)?; + } + Ok(()) + }) + } + }) + .collect::>() +} + +pub fn try_for_each_owned( + items: Vec, + f: impl (Fn(T) -> Result<(), E>) + Send + Sync, +) -> Result<(), E> +where + T: Send + Sync, + E: Send + 'static, +{ + let len = items.len(); + if len <= 1 { + for item in items { + f(item)?; + } + return Ok(()); + } + let chunk_size = good_chunk_size(len); + let f = &f; + scope_and_block(len.div_ceil(chunk_size), |scope| { + for chunk in into_chunks(items, chunk_size) { + scope.spawn(async move { + for item in chunk { + f(item)?; + } + Ok(()) + }) + } + }) + .collect::>() +} + +pub fn map_collect<'l, Item, PerItemResult, Result>( + items: &'l [Item], + f: impl Fn(&'l Item) -> PerItemResult + Send + Sync, +) -> Result +where + Item: Sync, + PerItemResult: Send + Sync + 'l, + Result: FromIterator, +{ + let len = items.len(); + if len == 0 { + return Result::from_iter(std::iter::empty()); // No items to process, return empty + // collection + } + let chunk_size = good_chunk_size(len); + let f = &f; + scope_and_block(len.div_ceil(chunk_size), |scope| { + for chunk in items.chunks(chunk_size) { + scope.spawn(async move { chunk.iter().map(f).collect::>() }) + } + }) + .flatten() + .collect() +} + +pub fn map_collect_owned<'l, Item, PerItemResult, Result>( + items: Vec, + f: impl Fn(Item) -> PerItemResult + Send + Sync, +) -> Result +where + Item: Send + Sync, + PerItemResult: Send + Sync + 'l, + Result: FromIterator, +{ + let len = items.len(); + if len == 0 { + return Result::from_iter(std::iter::empty()); // No items to process, return empty + // collection; + } + let chunk_size = good_chunk_size(len); + let f = &f; + scope_and_block(len.div_ceil(chunk_size), |scope| { + for chunk in into_chunks(items, chunk_size) { + scope.spawn(async move { chunk.map(f).collect::>() }) + } + }) + .flatten() + .collect() +} + +#[cfg(test)] +mod tests { + use std::{ + panic::{AssertUnwindSafe, catch_unwind}, + sync::atomic::{AtomicI32, Ordering}, + }; + + use super::*; + + #[tokio::test(flavor = "multi_thread")] + async fn test_parallel_for_each() { + let input = vec![1, 2, 3, 4, 5]; + let sum = AtomicI32::new(0); + for_each(&input, |&x| { + sum.fetch_add(x, Ordering::SeqCst); + }); + assert_eq!(sum.load(Ordering::SeqCst), 15); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_parallel_try_for_each() { + let input = vec![1, 2, 3, 4, 5]; + let result = try_for_each(&input, |&x| { + if x % 2 == 0 { + Ok(()) + } else { + Err(format!("Odd number {x} encountered")) + } + }); + assert!(result.is_err()); + assert_eq!(result.unwrap_err(), "Odd number 1 encountered"); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_parallel_try_for_each_mut() { + let mut input = vec![1, 2, 3, 4, 5]; + let result = try_for_each_mut(&mut input, |x| { + *x += 10; + if *x % 2 == 0 { + Ok(()) + } else { + Err(format!("Odd number {} encountered", *x)) + } + }); + assert!(result.is_err()); + assert_eq!(result.unwrap_err(), "Odd number 11 encountered"); + assert_eq!(input, vec![11, 12, 13, 14, 15]); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_parallel_for_each_owned() { + let input = vec![1, 2, 3, 4, 5]; + let sum = AtomicI32::new(0); + for_each_owned(input, |x| { + sum.fetch_add(x, Ordering::SeqCst); + }); + assert_eq!(sum.load(Ordering::SeqCst), 15); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_parallel_map_collect() { + let input = vec![1, 2, 3, 4, 5]; + let result: Vec<_> = map_collect(&input, |&x| x * 2); + assert_eq!(result, vec![2, 4, 6, 8, 10]); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_parallel_map_collect_owned() { + let input = vec![1, 2, 3, 4, 5]; + let result: Vec<_> = map_collect_owned(input, |x| x * 2); + assert_eq!(result, vec![2, 4, 6, 8, 10]); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_parallel_map_collect_owned_many() { + let input = vec![1; 1000]; + let result: Vec<_> = map_collect_owned(input, |x| x * 2); + assert_eq!(result, vec![2; 1000]); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_panic_in_scope() { + let result = catch_unwind(AssertUnwindSafe(|| { + let mut input = vec![1; 1000]; + input[744] = 2; + for_each(&input, |x| { + if *x == 2 { + panic!("Intentional panic"); + } + }); + panic!("Should not get here") + })); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().downcast_ref::<&str>(), + Some(&"Intentional panic") + ); + } +} diff --git a/turbopack/crates/turbo-tasks/src/scope.rs b/turbopack/crates/turbo-tasks/src/scope.rs index bfe5e355df358..4c474b35f22a3 100644 --- a/turbopack/crates/turbo-tasks/src/scope.rs +++ b/turbopack/crates/turbo-tasks/src/scope.rs @@ -1,52 +1,289 @@ -use std::sync::Arc; +//! A scoped tokio spawn implementation that allow a non-'static lifetime for tasks. -use crate::{TurboTasksApi, turbo_tasks, turbo_tasks_scope}; +use std::{ + any::Any, + marker::PhantomData, + panic::{self, AssertUnwindSafe, catch_unwind}, + pin::Pin, + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, + thread::{self, Thread}, +}; -/// A wrapper around [`rayon::Scope`] that preserves the [`turbo_tasks_scope`]. -pub struct Scope<'scope, 'a> { - scope: &'a rayon::Scope<'scope>, - handle: tokio::runtime::Handle, - turbo_tasks: Arc, - span: tracing::Span, +use futures::FutureExt; +use parking_lot::Mutex; +use tokio::{runtime::Handle, task::block_in_place}; +use tracing::{Instrument, Span, info_span}; + +use crate::{ + TurboTasksApi, + manager::{try_turbo_tasks, turbo_tasks_future_scope}, +}; + +struct ScopeInner { + main_thread: Thread, + remaining_tasks: AtomicUsize, + /// The first panic that occurred in the tasks, by task index. + /// The usize value is the index of the task. + panic: Mutex, usize)>>, } -impl<'scope> Scope<'scope, '_> { - pub fn spawn(&self, body: Body) +impl ScopeInner { + fn on_task_finished(&self, panic: Option<(Box, usize)>) { + if let Some((err, index)) = panic { + let mut old_panic = self.panic.lock(); + if old_panic.as_ref().is_none_or(|&(_, i)| i > index) { + *old_panic = Some((err, index)); + } + } + if self.remaining_tasks.fetch_sub(1, Ordering::Release) == 1 { + self.main_thread.unpark(); + } + } + + fn wait(&self) { + let _span = info_span!("blocking").entered(); + while self.remaining_tasks.load(Ordering::Acquire) != 0 { + thread::park(); + } + if let Some((err, _)) = self.panic.lock().take() { + panic::resume_unwind(err); + } + } +} + +/// Scope to allow spawning tasks with a limited lifetime. +/// +/// Dropping this Scope will wait for all tasks to complete. +pub struct Scope<'scope, 'env: 'scope, R: Send + 'env> { + results: &'scope [Mutex>], + index: AtomicUsize, + inner: Arc, + handle: Handle, + turbo_tasks: Option>, + span: Span, + /// Invariance over 'env, to make sure 'env cannot shrink, + /// which is necessary for soundness. + /// + /// see https://doc.rust-lang.org/src/std/thread/scoped.rs.html#12-29 + env: PhantomData<&'env mut &'env ()>, +} + +impl<'scope, 'env: 'scope, R: Send + 'env> Scope<'scope, 'env, R> { + /// Creates a new scope. + /// + /// # Safety + /// + /// The caller must ensure `Scope` is dropped and not forgotten. + unsafe fn new(results: &'scope [Mutex>]) -> Self { + Self { + results, + index: AtomicUsize::new(0), + inner: Arc::new(ScopeInner { + main_thread: thread::current(), + remaining_tasks: AtomicUsize::new(0), + panic: Mutex::new(None), + }), + handle: Handle::current(), + turbo_tasks: try_turbo_tasks(), + span: Span::current(), + env: PhantomData, + } + } + + /// Spawns a new task in the scope. + pub fn spawn(&self, f: F) where - Body: FnOnce(&Scope<'scope, '_>) + Send + 'scope, + F: Future + Send + 'env, { - let span = self.span.clone(); - let handle = self.handle.clone(); + let index = self.index.fetch_add(1, Ordering::Relaxed); + assert!(index < self.results.len(), "Too many tasks spawned"); + let result_cell: &Mutex> = &self.results[index]; + + let f: Box + Send + 'scope> = Box::new(async move { + let result = f.await; + *result_cell.lock() = Some(result); + }); + let f: *mut (dyn Future + Send + 'scope) = Box::into_raw(f); + // SAFETY: Scope ensures (e. g. in Drop) that spawned tasks is awaited before the + // lifetime `'env` ends. + #[allow( + clippy::unnecessary_cast, + reason = "Clippy thinks this is unnecessary, but it actually changes the lifetime" + )] + let f = f as *mut (dyn Future + Send + 'static); + // SAFETY: We just called `Box::into_raw`. + let f = unsafe { Box::from_raw(f) }; + // We pin the future in the box in memory to be able to await it. + let f = Pin::from(f); + let turbo_tasks = self.turbo_tasks.clone(); - self.scope.spawn(|scope| { - let _span = span.clone().entered(); - let _guard = handle.enter(); - turbo_tasks_scope(turbo_tasks.clone(), || { - body(&Scope { - scope, - span, - handle, - turbo_tasks, - }) - }) + let span = self.span.clone(); + + let inner = self.inner.clone(); + inner.remaining_tasks.fetch_add(1, Ordering::Relaxed); + self.handle.spawn(async move { + let result = AssertUnwindSafe( + async move { + if let Some(turbo_tasks) = turbo_tasks { + // Ensure that the turbo tasks context is maintained across the task. + turbo_tasks_future_scope(turbo_tasks, f).await; + } else { + // If no turbo tasks context is available, just run the future. + f.await; + } + } + .instrument(span), + ) + .catch_unwind() + .await; + let panic = result.err().map(|e| (e, index)); + inner.on_task_finished(panic); }); } } -/// A wrapper around [`rayon::in_place_scope`] that preserves the [`turbo_tasks_scope`]. -pub fn scope<'scope, Op, R>(op: Op) -> R +impl<'scope, 'env: 'scope, R: Send + 'env> Drop for Scope<'scope, 'env, R> { + fn drop(&mut self) { + self.inner.wait(); + } +} + +/// Helper method to spawn tasks in parallel, ensuring that all tasks are awaited and errors are +/// handled. Also ensures turbo tasks and tracing context are maintained across the tasks. +/// +/// Be aware that although this function avoids starving other independently spawned tasks, any +/// other code running concurrently in the same task will be suspended during the call to +/// block_in_place. This can happen e.g. when using the `join!` macro. To avoid this issue, call +/// `scope_and_block` in `spawn_blocking`. +pub fn scope_and_block<'env, F, R>(number_of_tasks: usize, f: F) -> impl Iterator where - Op: FnOnce(&Scope<'scope, '_>) -> R, + R: Send + 'env, + F: for<'scope> FnOnce(&'scope Scope<'scope, 'env, R>) + 'env, { - let span = tracing::Span::current(); - let handle = tokio::runtime::Handle::current(); - let turbo_tasks = turbo_tasks(); - rayon::in_place_scope(|scope| { - op(&Scope { - scope, - span, - handle, - turbo_tasks, + block_in_place(|| { + let mut results = Vec::with_capacity(number_of_tasks); + for _ in 0..number_of_tasks { + results.push(Mutex::new(None)); + } + let results = results.into_boxed_slice(); + let result = { + // SAFETY: We drop the Scope later. + let scope = unsafe { Scope::new(&results) }; + catch_unwind(AssertUnwindSafe(|| f(&scope))) + }; + if let Err(panic) = result { + panic::resume_unwind(panic); + } + results.into_iter().map(|mutex| { + mutex + .into_inner() + .expect("All values are set when the scope returns without panic") }) }) } + +#[cfg(test)] +mod tests { + use std::panic::{AssertUnwindSafe, catch_unwind}; + + use super::*; + + #[tokio::test(flavor = "multi_thread")] + async fn test_scope() { + let results = scope_and_block(1000, |scope| { + for i in 0..1000 { + scope.spawn(async move { i }); + } + }); + results.enumerate().for_each(|(i, result)| { + assert_eq!(result, i); + }); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_empty_scope() { + let results = scope_and_block(0, |scope| { + if false { + scope.spawn(async move { 42 }); + } + }); + assert_eq!(results.count(), 0); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_single_task() { + let results = scope_and_block(1, |scope| { + scope.spawn(async move { 42 }); + }) + .collect::>(); + assert_eq!(results, vec![42]); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_task_finish_before_scope() { + let results = scope_and_block(1, |scope| { + scope.spawn(async move { 42 }); + thread::sleep(std::time::Duration::from_millis(100)); + }) + .collect::>(); + assert_eq!(results, vec![42]); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_task_finish_after_scope() { + let results = scope_and_block(1, |scope| { + scope.spawn(async move { + thread::sleep(std::time::Duration::from_millis(100)); + 42 + }); + }) + .collect::>(); + assert_eq!(results, vec![42]); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_panic_in_scope_factory() { + let result = catch_unwind(AssertUnwindSafe(|| { + let _results = scope_and_block(1000, |scope| { + for i in 0..500 { + scope.spawn(async move { i }); + } + panic!("Intentional panic"); + }); + unreachable!(); + })); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().downcast_ref::<&str>(), + Some(&"Intentional panic") + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_panic_in_scope_task() { + let result = catch_unwind(AssertUnwindSafe(|| { + let _results = scope_and_block(1000, |scope| { + for i in 0..1000 { + scope.spawn(async move { + if i == 500 { + panic!("Intentional panic"); + } else if i == 501 { + panic!("Wrong intentional panic"); + } else { + i + } + }); + } + }); + unreachable!(); + })); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().downcast_ref::<&str>(), + Some(&"Intentional panic") + ); + } +} diff --git a/turbopack/crates/turbo-tasks/src/util.rs b/turbopack/crates/turbo-tasks/src/util.rs index bb39dff895224..ecc57de367173 100644 --- a/turbopack/crates/turbo-tasks/src/util.rs +++ b/turbopack/crates/turbo-tasks/src/util.rs @@ -1,8 +1,10 @@ use std::{ + cell::SyncUnsafeCell, error::Error as StdError, fmt::{Debug, Display}, future::Future, hash::{Hash, Hasher}, + mem::ManuallyDrop, ops::Deref, pin::Pin, sync::Arc, @@ -259,3 +261,124 @@ impl Fn(Pin<&mut F>, &mut Context<'a>) -> Poll> (this.wrapper)(this.future, cx) } } + +/// Similar to slice::chunks but for owned data. Chunks are Send and Sync to allow to use it for +/// parallelism. +pub fn into_chunks(data: Vec, chunk_size: usize) -> IntoChunks { + let (ptr, length, capacity) = data.into_raw_parts(); + // SAFETY: changing a pointer from T to SyncUnsafeCell> is safe as both types + // have repr(transparent). + let ptr = ptr as *mut SyncUnsafeCell>; + // SAFETY: The ptr, length and capacity were from into_raw_parts(). This is the only place where + // we use ptr. + let data = + unsafe { Vec::>>::from_raw_parts(ptr, length, capacity) }; + IntoChunks { + data: Arc::new(data), + index: 0, + chunk_size, + } +} + +pub struct IntoChunks { + data: Arc>>>, + index: usize, + chunk_size: usize, +} + +impl Iterator for IntoChunks { + type Item = Chunk; + + fn next(&mut self) -> Option { + if self.index < self.data.len() { + let end = self.data.len().min(self.index + self.chunk_size); + let item = Chunk { + data: Arc::clone(&self.data), + index: self.index, + end, + }; + self.index = end; + Some(item) + } else { + None + } + } +} + +impl IntoChunks { + fn next_item(&mut self) -> Option { + if self.index < self.data.len() { + // SAFETY: We are the only owner of this chunk of data and we make sure that this item + // is no longer dropped by moving the index + let item = unsafe { ManuallyDrop::take(&mut *self.data[self.index].get()) }; + self.index += 1; + Some(item) + } else { + None + } + } +} + +impl Drop for IntoChunks { + fn drop(&mut self) { + // To avoid leaking memory we need to drop the remaining items + while self.next_item().is_some() {} + } +} + +pub struct Chunk { + data: Arc>>>, + index: usize, + end: usize, +} + +impl Iterator for Chunk { + type Item = T; + + fn next(&mut self) -> Option { + if self.index < self.end { + // SAFETY: We are the only owner of this chunk of data and we make sure that this item + // is no longer dropped by moving the index + let item = unsafe { ManuallyDrop::take(&mut *self.data[self.index].get()) }; + self.index += 1; + Some(item) + } else { + None + } + } +} + +impl Drop for Chunk { + fn drop(&mut self) { + // To avoid leaking memory we need to drop the remaining items + while self.next().is_some() {} + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_chunk_iterator() { + let data = [(); 10] + .into_iter() + .enumerate() + .map(|(i, _)| Arc::new(i)) + .collect::>(); + let mut chunks = into_chunks(data.clone(), 3); + let mut first_chunk = chunks.next().unwrap(); + let second_chunk = chunks.next().unwrap(); + drop(chunks); + assert_eq!( + second_chunk.into_iter().map(|a| *a).collect::>(), + vec![3, 4, 5] + ); + assert_eq!(*first_chunk.next().unwrap(), 0); + assert_eq!(*first_chunk.next().unwrap(), 1); + drop(first_chunk); + for arc in data { + assert_eq!(Arc::strong_count(&arc), 1); + } + } +} diff --git a/turbopack/crates/turbopack/tests/node-file-trace.rs b/turbopack/crates/turbopack/tests/node-file-trace.rs index 4670a3905dddb..8f4b0aa3d00be 100644 --- a/turbopack/crates/turbopack/tests/node-file-trace.rs +++ b/turbopack/crates/turbopack/tests/node-file-trace.rs @@ -272,7 +272,7 @@ fn test_cases() {} #[apply(test_cases)] fn node_file_trace_noop_backing_storage(#[case] input: CaseInput) { - node_file_trace(input, "noop_backing_storage", false, 1, 120, |_| { + node_file_trace(input, "noop_backing_storage", 1, 120, |_| { TurboTasks::new(TurboTasksBackend::new( turbo_tasks_backend::BackendOptions::default(), turbo_tasks_backend::noop_backing_storage(), @@ -282,7 +282,7 @@ fn node_file_trace_noop_backing_storage(#[case] input: CaseInput) { #[apply(test_cases)] fn node_file_trace_persistent(#[case] input: CaseInput) { - node_file_trace(input, "persistent_cache", false, 2, 240, |directory_path| { + node_file_trace(input, "persistent_cache", 2, 240, |directory_path| { TurboTasks::new(TurboTasksBackend::new( turbo_tasks_backend::BackendOptions::default(), turbo_tasks_backend::default_backing_storage( @@ -302,31 +302,18 @@ fn node_file_trace_persistent(#[case] input: CaseInput) { #[cfg(feature = "bench_against_node_nft")] #[apply(test_cases)] -fn bench_against_node_nft_st(#[case] input: CaseInput) { - bench_against_node_nft_inner(input, false); +fn bench_against_node_nft(#[case] input: CaseInput) { + bench_against_node_nft_inner(input); } #[cfg(feature = "bench_against_node_nft")] -#[apply(test_cases)] -fn bench_against_node_nft_mt(#[case] input: CaseInput) { - bench_against_node_nft_inner(input, true); -} - -#[cfg(feature = "bench_against_node_nft")] -fn bench_against_node_nft_inner(input: CaseInput, multi_threaded: bool) { - node_file_trace( - input, - "noop_backing_storage", - multi_threaded, - 1, - 120, - |_| { - TurboTasks::new(TurboTasksBackend::new( - turbo_tasks_backend::BackendOptions::default(), - turbo_tasks_backend::noop_backing_storage(), - )) - }, - ); +fn bench_against_node_nft_inner(input: CaseInput) { + node_file_trace(input, "noop_backing_storage", 1, 120, |_| { + TurboTasks::new(TurboTasksBackend::new( + turbo_tasks_backend::BackendOptions::default(), + turbo_tasks_backend::noop_backing_storage(), + )) + }); } #[turbo_tasks::function(operation)] @@ -401,7 +388,6 @@ fn node_file_trace( expected_stderr, }: CaseInput, mode: &str, - multi_threaded: bool, run_count: i32, timeout_len: u64, create_turbo_tasks: impl Fn(&Path) -> Arc>, @@ -410,15 +396,9 @@ fn node_file_trace( LazyLock::new(|| Arc::new(Mutex::new(Vec::new()))); let r = &mut { - let mut builder = if multi_threaded { - tokio::runtime::Builder::new_multi_thread() - } else { - tokio::runtime::Builder::new_current_thread() - }; + let mut builder = tokio::runtime::Builder::new_multi_thread(); builder.enable_all(); - if !multi_threaded { - builder.max_blocking_threads(20); - } + builder.max_blocking_threads(20); builder.build().unwrap() }; r.block_on(async move { @@ -490,12 +470,7 @@ fn node_file_trace( bench_suites_lock.push(BenchSuite { suite: input .trim_start_matches("node-file-trace/integration/") - .to_string() - + (if multi_threaded { - " (multi-threaded)" - } else { - "" - }), + .to_string(), is_faster, rust_duration, node_duration,