Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
cc36053
tests need to run in multi-thread pool
sokra Aug 2, 2025
560b47a
fix hit count tracking
sokra Aug 2, 2025
4aa6e79
Turbopack: avoid using rayon in favor of tokio tasks
sokra Jul 31, 2025
a51f572
fixup
sokra Jul 31, 2025
e60822f
fix result lifetime
sokra Aug 1, 2025
acf687b
add tests
sokra Aug 1, 2025
1c6b869
make turbo_tasks context optional
sokra Aug 1, 2025
b50792b
fixup tokio context
sokra Aug 1, 2025
9396a9d
clippy
sokra Aug 1, 2025
a5e8556
clippy
sokra Aug 2, 2025
c3f2bdc
rename methods for clippy
sokra Aug 2, 2025
4bb6b61
crash eariler without memory corruption when not in multi-thread pool
sokra Aug 2, 2025
c0e009e
run node-file-trace tests multi threaded
sokra Aug 3, 2025
493a3a5
fix test name
sokra Aug 4, 2025
6cc1ac1
Turbopack: show timing event for database compaction
sokra Aug 5, 2025
d151be4
avoid initial compaction
sokra Aug 5, 2025
133e770
fixup timing event for compact
sokra Aug 5, 2025
3cec8e7
Update messages
sokra Aug 5, 2025
1f4164c
Turbopack: use block in place for db writes
sokra Aug 5, 2025
8ade474
fix initial duplication set
sokra Aug 5, 2025
dfae0ca
improve compaction config
sokra Aug 5, 2025
99924f8
update test case
sokra Aug 6, 2025
f5cb5fa
Turbopack: parallel drop data before shutdown
sokra Jul 30, 2025
6dcc9a2
fixup
sokra Aug 1, 2025
298373d
Turbopack: Decompress medium values lazily to reduce memory during co…
sokra Aug 4, 2025
bb5f3e4
clippy
sokra Aug 4, 2025
e32c74c
fixup
sokra Aug 6, 2025
d6eaf49
refactor (de)compression code
sokra Aug 6, 2025
163af6a
fixup
sokra Aug 6, 2025
63c49bb
Turbopack: bigger small value blocks
sokra Aug 5, 2025
7f2fbee
Turbopack: remove value compression dictionary
sokra Aug 4, 2025
9714c1e
avoid duplicate code in compression dictionary creation
sokra Aug 4, 2025
557d515
avoid multiple iterations for compression dictionary
sokra Aug 4, 2025
d199089
fixup compute_dictionary
sokra Aug 4, 2025
b09c570
update zstd
sokra Aug 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/next/src/server/dev/hot-reloader-turbopack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ export async function createHotReloaderTurbopack(
}
)
backgroundLogCompilationEvents(project, {
eventTypes: ['StartupCacheInvalidationEvent'],
eventTypes: ['StartupCacheInvalidationEvent', 'TimingEvent'],
})
setBundlerFindSourceMapImplementation(
getSourceMapFromTurbopack.bind(null, project, projectPath)
Expand Down
15 changes: 5 additions & 10 deletions turbopack/crates/turbo-persistence-tools/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::path::PathBuf;

use anyhow::{Context, Result, bail};
use turbo_persistence::{MetaFileEntryInfo, TurboPersistence};
use turbo_persistence::{MetaFileEntryInfo, SerialScheduler, TurboPersistence};

fn main() -> Result<()> {
// Get CLI argument
Expand All @@ -16,7 +16,7 @@ fn main() -> Result<()> {
bail!("The provided path does not exist: {}", path.display());
}

let db = TurboPersistence::open_read_only(path)?;
let db: TurboPersistence<SerialScheduler> = TurboPersistence::open_read_only(path)?;
let meta_info = db
.meta_info()
.context("Failed to retrieve meta information")?;
Expand All @@ -35,7 +35,6 @@ fn main() -> Result<()> {
amqf_entries,
sst_size,
key_compression_dictionary_size,
value_compression_dictionary_size,
block_count,
} in meta_file.entries
{
Expand All @@ -45,15 +44,11 @@ fn main() -> Result<()> {
);
println!(" AMQF {amqf_entries} entries = {} KiB", amqf_size / 1024);
println!(
" {} KiB = {} kiB key compression dict + {} KiB value compression dict + \
{block_count} blocks (avg {} bytes/block)",
" {} KiB = {} kiB key compression dict + {block_count} blocks (avg {} \
bytes/block)",
sst_size / 1024,
key_compression_dictionary_size / 1024,
value_compression_dictionary_size / 1024,
(sst_size
- key_compression_dictionary_size as u64
- value_compression_dictionary_size as u64)
/ block_count as u64
(sst_size - key_compression_dictionary_size as u64) / block_count as u64
);
}
if !meta_file.obsolete_sst_files.is_empty() {
Expand Down
4 changes: 2 additions & 2 deletions turbopack/crates/turbo-persistence/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ memmap2 = "0.9.5"
parking_lot = { workspace = true }
qfilter = { version = "0.2.4", features = ["serde"] }
quick_cache = { workspace = true }
rayon = { workspace = true }
rustc-hash = { workspace = true }
smallvec = { workspace = true}
thread_local = { workspace = true }
tracing = { workspace = true }
twox-hash = { version = "2.0.1", features = ["xxhash64"] }
zstd = { version = "0.13.2", features = ["zdict_builder"] }
zstd = { version = "0.13.3", features = ["zdict_builder"] }

[dev-dependencies]
rand = { workspace = true, features = ["small_rng"] }
rayon = { workspace = true }
tempfile = { workspace = true }

[lints]
Expand Down
2 changes: 0 additions & 2 deletions turbopack/crates/turbo-persistence/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ A meta file can contain metadata about multiple SST files. The metadata is store
- foreach described SST file
- 4 bytes sequence number of the SST file
- 2 bytes key Compression Dictionary length
- 2 bytes value Compression Dictionary length
- 2 bytes block count
- 8 bytes min hash
- 8 bytes max hash
Expand All @@ -59,7 +58,6 @@ A meta file can contain metadata about multiple SST files. The metadata is store
The SST file contains only data without any header.

- serialized key Compression Dictionary
- serialized value Compression Dictionary
- foreach block
- 4 bytes uncompressed block length
- compressed data
Expand Down
15 changes: 12 additions & 3 deletions turbopack/crates/turbo-persistence/src/collector.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::mem::take;

use crate::{
ValueBuffer,
collector_entry::{CollectorEntry, CollectorEntryValue, EntryKey},
Expand Down Expand Up @@ -90,11 +92,11 @@ impl<K: StoreKey, const SIZE_SHIFT: usize> Collector<K, SIZE_SHIFT> {
self.entries.push(entry);
}

/// Sorts the entries and returns them along with the total key and value sizes. This doesn't
/// Sorts the entries and returns them along with the total key size. This doesn't
/// clear the entries.
pub fn sorted(&mut self) -> (&[CollectorEntry<K>], usize, usize) {
pub fn sorted(&mut self) -> (&[CollectorEntry<K>], usize) {
self.entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
(&self.entries, self.total_key_size, self.total_value_size)
(&self.entries, self.total_key_size)
}

/// Clears the collector.
Expand All @@ -111,4 +113,11 @@ impl<K: StoreKey, const SIZE_SHIFT: usize> Collector<K, SIZE_SHIFT> {
self.total_value_size = 0;
self.entries.drain(..)
}

/// Clears the collector and drops the capacity
pub fn drop_contents(&mut self) {
drop(take(&mut self.entries));
self.total_key_size = 0;
self.total_value_size = 0;
}
}
21 changes: 14 additions & 7 deletions turbopack/crates/turbo-persistence/src/compaction/selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ impl Default for CompactConfig {
optimal_merge_count: 8,
max_merge_count: 32,
max_merge_bytes: 500 * MB,
min_merge_duplication_bytes: MB,
optimal_merge_duplication_bytes: 10 * MB,
min_merge_duplication_bytes: 50 * MB,
optimal_merge_duplication_bytes: 100 * MB,
max_merge_segment_count: 8,
}
}
Expand Down Expand Up @@ -233,13 +233,20 @@ pub fn get_merge_segments<T: Compactable>(
// We have reached the maximum number of merge jobs, so we stop here.
break;
}
let mut current_range = start_compactable.range();
let start_compactable_range = start_compactable.range();
let start_compactable_size = start_compactable.size();
let mut current_range = start_compactable_range.clone();

// We might need to restart the search if we need to extend the range.
'search: loop {
let mut current_set = smallvec![start_index];
let mut current_size = start_compactable.size();
let mut current_size = start_compactable_size;
let mut duplication = IntervalMap::<Option<DuplicationInfo>>::new();
duplication.update(start_compactable_range.clone(), |dup_info| {
dup_info
.get_or_insert_default()
.add(start_compactable_size, &start_compactable_range);
});
let mut current_skip = 0;

// We will capture compactables in the current_range until we find a optimal merge
Expand Down Expand Up @@ -609,8 +616,8 @@ mod tests {
min_merge_count: 2,
optimal_merge_count: 4,
max_merge_bytes: 5000,
min_merge_duplication_bytes: 200,
optimal_merge_duplication_bytes: 500,
min_merge_duplication_bytes: 500,
optimal_merge_duplication_bytes: 1000,
max_merge_segment_count: 4,
};
let jobs = get_merge_segments(&containers, &config);
Expand Down Expand Up @@ -653,7 +660,7 @@ mod tests {
println!("Number of compactions: {number_of_compactions}");

let metrics = compute_metrics(&containers, 0..=KEY_RANGE);
assert!(number_of_compactions < 40);
assert!(number_of_compactions < 30);
assert!(containers.len() < 30);
assert!(metrics.duplication < 0.5);
}
Expand Down
56 changes: 56 additions & 0 deletions turbopack/crates/turbo-persistence/src/compression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use std::{mem::MaybeUninit, sync::Arc};

use anyhow::{Context, Result};
use lzzzz::lz4::{ACC_LEVEL_DEFAULT, decompress, decompress_with_dict};

#[tracing::instrument(level = "trace", skip_all)]
pub fn decompress_into_arc(
uncompressed_length: u32,
block: &[u8],
compression_dictionary: Option<&[u8]>,
_long_term: bool,
) -> Result<Arc<[u8]>> {
// We directly allocate the buffer in an Arc to avoid copying it into an Arc and avoiding
// double indirection. This is a dynamically sized arc.
let buffer: Arc<[MaybeUninit<u8>]> = Arc::new_zeroed_slice(uncompressed_length as usize);
// Assume that the buffer is initialized.
let buffer = Arc::into_raw(buffer);
// Safety: Assuming that the buffer is initialized is safe because we just created it as
// zeroed slice and u8 doesn't require initialization.
let mut buffer = unsafe { Arc::from_raw(buffer as *mut [u8]) };
// Safety: We know that the buffer is not shared yet.
let decompressed = unsafe { Arc::get_mut_unchecked(&mut buffer) };
let bytes_writes = if let Some(dict) = compression_dictionary {
// Safety: decompress_with_dict will only write to `decompressed` and not read from it.
decompress_with_dict(block, decompressed, dict)?
} else {
// Safety: decompress will only write to `decompressed` and not read from it.
decompress(block, decompressed)?
};
assert_eq!(
bytes_writes, uncompressed_length as usize,
"Decompressed length does not match expected length"
);
// Safety: The buffer is now fully initialized and can be used.
Ok(buffer)
}

#[tracing::instrument(level = "trace", skip_all)]
pub fn compress_into_buffer(
block: &[u8],
dict: Option<&[u8]>,
_long_term: bool,
buffer: &mut Vec<u8>,
) -> Result<()> {
let mut compressor = if let Some(dict) = dict {
lzzzz::lz4::Compressor::with_dict(dict)
} else {
lzzzz::lz4::Compressor::new()
}
.context("LZ4 compressor creation failed")?;
let acc_factor = ACC_LEVEL_DEFAULT;
compressor
.next_to_vec(block, buffer, acc_factor)
.context("Compression failed")?;
Ok(())
}
Loading