Skip to content

Commit fb0fe1b

Browse files
committed
Turbopack: add parallel execution helpers
1 parent a16c8a9 commit fb0fe1b

File tree

5 files changed

+712
-38
lines changed

5 files changed

+712
-38
lines changed

turbopack/crates/turbo-tasks/src/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
#![feature(never_type)]
3838
#![feature(downcast_unchecked)]
3939
#![feature(ptr_metadata)]
40+
#![feature(sync_unsafe_cell)]
41+
#![feature(vec_into_raw_parts)]
4042

4143
pub mod backend;
4244
mod capture_future;
@@ -64,13 +66,14 @@ mod no_move_vec;
6466
mod once_map;
6567
mod output;
6668
pub mod panic_hooks;
69+
pub mod parallel;
6770
pub mod persisted_graph;
6871
pub mod primitives;
6972
mod raw_vc;
7073
mod read_options;
7174
mod read_ref;
7275
pub mod registry;
73-
mod scope;
76+
pub mod scope;
7477
mod serialization_invalidation;
7578
pub mod small_duration;
7679
mod spawn;
@@ -115,7 +118,6 @@ pub use raw_vc::{CellId, RawVc, ReadRawVcFuture, ResolveTypeError};
115118
pub use read_options::ReadCellOptions;
116119
pub use read_ref::ReadRef;
117120
use rustc_hash::FxHasher;
118-
pub use scope::scope;
119121
pub use serialization_invalidation::SerializationInvalidator;
120122
pub use shrink_to_fit::ShrinkToFit;
121123
pub use spawn::{JoinHandle, block_for_future, spawn, spawn_blocking, spawn_thread};

turbopack/crates/turbo-tasks/src/manager.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1678,6 +1678,10 @@ pub fn turbo_tasks() -> Arc<dyn TurboTasksApi> {
16781678
TURBO_TASKS.with(|arc| arc.clone())
16791679
}
16801680

1681+
pub fn try_turbo_tasks() -> Option<Arc<dyn TurboTasksApi>> {
1682+
TURBO_TASKS.try_with(|arc| arc.clone()).ok()
1683+
}
1684+
16811685
pub fn with_turbo_tasks<T>(func: impl FnOnce(&Arc<dyn TurboTasksApi>) -> T) -> T {
16821686
TURBO_TASKS.with(|arc| func(arc))
16831687
}
Lines changed: 308 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,308 @@
1+
//! Parallel for each and map using tokio tasks.
2+
//!
3+
//! This avoid the problem of sleeping threads with mimalloc when using rayon in combination with
4+
//! tokio. It also avoid having multiple thread pools.
5+
//! see also https://pwy.io/posts/mimalloc-cigarette/
6+
7+
use std::{sync::LazyLock, thread::available_parallelism};
8+
9+
use crate::{scope::scope_and_block, util::into_chunks};
10+
11+
/// Calculates a good chunk size for parallel processing based on the number of available threads.
12+
/// This is used to ensure that the workload is evenly distributed across the threads.
13+
fn good_chunk_size(len: usize) -> usize {
14+
static GOOD_CHUNK_COUNT: LazyLock<usize> =
15+
LazyLock::new(|| available_parallelism().map_or(16, |c| c.get() * 4));
16+
let min_chunk_count = *GOOD_CHUNK_COUNT;
17+
len.div_ceil(min_chunk_count)
18+
}
19+
20+
pub fn for_each<'l, T, F>(items: &'l [T], f: F)
21+
where
22+
T: Sync,
23+
F: Fn(&'l T) + Send + Sync,
24+
{
25+
let len = items.len();
26+
if len <= 1 {
27+
for item in items {
28+
f(item);
29+
}
30+
return;
31+
}
32+
let chunk_size = good_chunk_size(len);
33+
let f = &f;
34+
let _results = scope_and_block(len.div_ceil(chunk_size), |scope| {
35+
for chunk in items.chunks(chunk_size) {
36+
scope.spawn(async move {
37+
for item in chunk {
38+
f(item);
39+
}
40+
})
41+
}
42+
});
43+
}
44+
45+
pub fn for_each_owned<T>(items: Vec<T>, f: impl Fn(T) + Send + Sync)
46+
where
47+
T: Send + Sync,
48+
{
49+
let len = items.len();
50+
if len <= 1 {
51+
for item in items {
52+
f(item);
53+
}
54+
return;
55+
}
56+
let chunk_size = good_chunk_size(len);
57+
let f = &f;
58+
let _results = scope_and_block(len.div_ceil(chunk_size), |scope| {
59+
for chunk in into_chunks(items, chunk_size) {
60+
scope.spawn(async move {
61+
// SAFETY: Even when f() panics we drop all items in the chunk.
62+
for item in chunk {
63+
f(item);
64+
}
65+
})
66+
}
67+
});
68+
}
69+
70+
pub fn try_for_each<'l, T, E>(
71+
items: &'l [T],
72+
f: impl (Fn(&'l T) -> Result<(), E>) + Send + Sync,
73+
) -> Result<(), E>
74+
where
75+
T: Sync,
76+
E: Send + 'static,
77+
{
78+
let len = items.len();
79+
if len <= 1 {
80+
for item in items {
81+
f(item)?;
82+
}
83+
return Ok(());
84+
}
85+
let chunk_size = good_chunk_size(len);
86+
let f = &f;
87+
scope_and_block(len.div_ceil(chunk_size), |scope| {
88+
for chunk in items.chunks(chunk_size) {
89+
scope.spawn(async move {
90+
for item in chunk {
91+
f(item)?;
92+
}
93+
Ok(())
94+
})
95+
}
96+
})
97+
.collect::<Result<(), E>>()
98+
}
99+
100+
pub fn try_for_each_mut<'l, T, E>(
101+
items: &'l mut [T],
102+
f: impl (Fn(&'l mut T) -> Result<(), E>) + Send + Sync,
103+
) -> Result<(), E>
104+
where
105+
T: Send + Sync,
106+
E: Send + 'static,
107+
{
108+
let len = items.len();
109+
if len <= 1 {
110+
for item in items {
111+
f(item)?;
112+
}
113+
return Ok(());
114+
}
115+
let chunk_size = good_chunk_size(len);
116+
let f = &f;
117+
scope_and_block(len.div_ceil(chunk_size), |scope| {
118+
for chunk in items.chunks_mut(chunk_size) {
119+
scope.spawn(async move {
120+
for item in chunk {
121+
f(item)?;
122+
}
123+
Ok(())
124+
})
125+
}
126+
})
127+
.collect::<Result<(), E>>()
128+
}
129+
130+
pub fn try_for_each_owned<T, E>(
131+
items: Vec<T>,
132+
f: impl (Fn(T) -> Result<(), E>) + Send + Sync,
133+
) -> Result<(), E>
134+
where
135+
T: Send + Sync,
136+
E: Send + 'static,
137+
{
138+
let len = items.len();
139+
if len <= 1 {
140+
for item in items {
141+
f(item)?;
142+
}
143+
return Ok(());
144+
}
145+
let chunk_size = good_chunk_size(len);
146+
let f = &f;
147+
scope_and_block(len.div_ceil(chunk_size), |scope| {
148+
for chunk in into_chunks(items, chunk_size) {
149+
scope.spawn(async move {
150+
for item in chunk {
151+
f(item)?;
152+
}
153+
Ok(())
154+
})
155+
}
156+
})
157+
.collect::<Result<(), E>>()
158+
}
159+
160+
pub fn map_collect<'l, Item, PerItemResult, Result>(
161+
items: &'l [Item],
162+
f: impl Fn(&'l Item) -> PerItemResult + Send + Sync,
163+
) -> Result
164+
where
165+
Item: Sync,
166+
PerItemResult: Send + Sync + 'l,
167+
Result: FromIterator<PerItemResult>,
168+
{
169+
let len = items.len();
170+
if len == 0 {
171+
return Result::from_iter(std::iter::empty()); // No items to process, return empty
172+
// collection
173+
}
174+
let chunk_size = good_chunk_size(len);
175+
let f = &f;
176+
scope_and_block(len.div_ceil(chunk_size), |scope| {
177+
for chunk in items.chunks(chunk_size) {
178+
scope.spawn(async move { chunk.iter().map(f).collect::<Vec<_>>() })
179+
}
180+
})
181+
.flatten()
182+
.collect()
183+
}
184+
185+
pub fn map_collect_owned<'l, Item, PerItemResult, Result>(
186+
items: Vec<Item>,
187+
f: impl Fn(Item) -> PerItemResult + Send + Sync,
188+
) -> Result
189+
where
190+
Item: Send + Sync,
191+
PerItemResult: Send + Sync + 'l,
192+
Result: FromIterator<PerItemResult>,
193+
{
194+
let len = items.len();
195+
if len == 0 {
196+
return Result::from_iter(std::iter::empty()); // No items to process, return empty
197+
// collection;
198+
}
199+
let chunk_size = good_chunk_size(len);
200+
let f = &f;
201+
scope_and_block(len.div_ceil(chunk_size), |scope| {
202+
for chunk in into_chunks(items, chunk_size) {
203+
scope.spawn(async move { chunk.map(f).collect::<Vec<_>>() })
204+
}
205+
})
206+
.flatten()
207+
.collect()
208+
}
209+
210+
#[cfg(test)]
211+
mod tests {
212+
use std::{
213+
panic::{AssertUnwindSafe, catch_unwind},
214+
sync::atomic::{AtomicI32, Ordering},
215+
};
216+
217+
use super::*;
218+
219+
#[tokio::test(flavor = "multi_thread")]
220+
async fn test_parallel_for_each() {
221+
let input = vec![1, 2, 3, 4, 5];
222+
let sum = AtomicI32::new(0);
223+
for_each(&input, |&x| {
224+
sum.fetch_add(x, Ordering::SeqCst);
225+
});
226+
assert_eq!(sum.load(Ordering::SeqCst), 15);
227+
}
228+
229+
#[tokio::test(flavor = "multi_thread")]
230+
async fn test_parallel_try_for_each() {
231+
let input = vec![1, 2, 3, 4, 5];
232+
let result = try_for_each(&input, |&x| {
233+
if x % 2 == 0 {
234+
Ok(())
235+
} else {
236+
Err(format!("Odd number {x} encountered"))
237+
}
238+
});
239+
assert!(result.is_err());
240+
assert_eq!(result.unwrap_err(), "Odd number 1 encountered");
241+
}
242+
243+
#[tokio::test(flavor = "multi_thread")]
244+
async fn test_parallel_try_for_each_mut() {
245+
let mut input = vec![1, 2, 3, 4, 5];
246+
let result = try_for_each_mut(&mut input, |x| {
247+
*x += 10;
248+
if *x % 2 == 0 {
249+
Ok(())
250+
} else {
251+
Err(format!("Odd number {} encountered", *x))
252+
}
253+
});
254+
assert!(result.is_err());
255+
assert_eq!(result.unwrap_err(), "Odd number 11 encountered");
256+
assert_eq!(input, vec![11, 12, 13, 14, 15]);
257+
}
258+
259+
#[tokio::test(flavor = "multi_thread")]
260+
async fn test_parallel_for_each_owned() {
261+
let input = vec![1, 2, 3, 4, 5];
262+
let sum = AtomicI32::new(0);
263+
for_each_owned(input, |x| {
264+
sum.fetch_add(x, Ordering::SeqCst);
265+
});
266+
assert_eq!(sum.load(Ordering::SeqCst), 15);
267+
}
268+
269+
#[tokio::test(flavor = "multi_thread")]
270+
async fn test_parallel_map_collect() {
271+
let input = vec![1, 2, 3, 4, 5];
272+
let result: Vec<_> = map_collect(&input, |&x| x * 2);
273+
assert_eq!(result, vec![2, 4, 6, 8, 10]);
274+
}
275+
276+
#[tokio::test(flavor = "multi_thread")]
277+
async fn test_parallel_map_collect_owned() {
278+
let input = vec![1, 2, 3, 4, 5];
279+
let result: Vec<_> = map_collect_owned(input, |x| x * 2);
280+
assert_eq!(result, vec![2, 4, 6, 8, 10]);
281+
}
282+
283+
#[tokio::test(flavor = "multi_thread")]
284+
async fn test_parallel_map_collect_owned_many() {
285+
let input = vec![1; 1000];
286+
let result: Vec<_> = map_collect_owned(input, |x| x * 2);
287+
assert_eq!(result, vec![2; 1000]);
288+
}
289+
290+
#[tokio::test(flavor = "multi_thread")]
291+
async fn test_panic_in_scope() {
292+
let result = catch_unwind(AssertUnwindSafe(|| {
293+
let mut input = vec![1; 1000];
294+
input[744] = 2;
295+
for_each(&input, |x| {
296+
if *x == 2 {
297+
panic!("Intentional panic");
298+
}
299+
});
300+
panic!("Should not get here")
301+
}));
302+
assert!(result.is_err());
303+
assert_eq!(
304+
result.unwrap_err().downcast_ref::<&str>(),
305+
Some(&"Intentional panic")
306+
);
307+
}
308+
}

0 commit comments

Comments
 (0)