Skip to content

Commit 286b6e3

Browse files
committed
Turbopack: Decompress medium values lazily to reduce memory during compaction
1 parent 1633ce1 commit 286b6e3

File tree

5 files changed

+128
-49
lines changed

5 files changed

+128
-49
lines changed

turbopack/crates/turbo-persistence/src/db.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -915,8 +915,8 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
915915
});
916916
}
917917

918-
fn create_sst_file(
919-
entries: &[LookupEntry],
918+
fn create_sst_file<'l>(
919+
entries: &[LookupEntry<'l>],
920920
total_key_size: usize,
921921
total_value_size: usize,
922922
path: &Path,
@@ -959,7 +959,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
959959

960960
let mut total_key_size = 0;
961961
let mut total_value_size = 0;
962-
let mut current: Option<LookupEntry> = None;
962+
let mut current: Option<LookupEntry<'_>> = None;
963963
let mut entries = Vec::new();
964964
let mut last_entries = Vec::new();
965965
let mut last_entries_total_sizes = (0, 0);
@@ -970,7 +970,8 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
970970
if let Some(current) = current.take() {
971971
if current.key != entry.key {
972972
let key_size = current.key.len();
973-
let value_size = current.value.size_in_sst();
973+
let value_size =
974+
current.value.uncompressed_size_in_sst();
974975
total_key_size += key_size;
975976
total_value_size += value_size;
976977

@@ -1017,7 +1018,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
10171018
}
10181019
if let Some(entry) = current {
10191020
total_key_size += entry.key.len();
1020-
total_value_size += entry.value.size_in_sst();
1021+
total_value_size += entry.value.uncompressed_size_in_sst();
10211022
entries.push(entry);
10221023
}
10231024

turbopack/crates/turbo-persistence/src/lookup_entry.rs

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,28 +14,43 @@ pub enum LookupValue {
1414
Blob { sequence_number: u32 },
1515
}
1616

17-
impl LookupValue {
17+
/// A value from a SST file lookup.
18+
pub enum LazyLookupValue<'l> {
19+
/// A LookupValue
20+
Eager(LookupValue),
21+
/// A medium sized value that is still compressed.
22+
Medium {
23+
uncompressed_size: u32,
24+
block: &'l [u8],
25+
dictionary: &'l [u8],
26+
},
27+
}
28+
29+
impl LazyLookupValue<'_> {
1830
/// Returns the size of the value in the SST file.
19-
pub fn size_in_sst(&self) -> usize {
31+
pub fn uncompressed_size_in_sst(&self) -> usize {
2032
match self {
21-
LookupValue::Slice { value } => value.len(),
22-
LookupValue::Deleted => 0,
23-
LookupValue::Blob { .. } => 0,
33+
LazyLookupValue::Eager(LookupValue::Slice { value }) => value.len(),
34+
LazyLookupValue::Eager(LookupValue::Deleted) => 0,
35+
LazyLookupValue::Eager(LookupValue::Blob { .. }) => 0,
36+
LazyLookupValue::Medium {
37+
uncompressed_size, ..
38+
} => *uncompressed_size as usize,
2439
}
2540
}
2641
}
2742

2843
/// An entry from a SST file lookup.
29-
pub struct LookupEntry {
44+
pub struct LookupEntry<'l> {
3045
/// The hash of the key.
3146
pub hash: u64,
3247
/// The key.
3348
pub key: ArcSlice<u8>,
3449
/// The value.
35-
pub value: LookupValue,
50+
pub value: LazyLookupValue<'l>,
3651
}
3752

38-
impl Entry for LookupEntry {
53+
impl Entry for LookupEntry<'_> {
3954
fn key_hash(&self) -> u64 {
4055
self.hash
4156
}
@@ -50,17 +65,26 @@ impl Entry for LookupEntry {
5065

5166
fn value(&self) -> EntryValue<'_> {
5267
match &self.value {
53-
LookupValue::Deleted => EntryValue::Deleted,
54-
LookupValue::Slice { value } => {
68+
LazyLookupValue::Eager(LookupValue::Deleted) => EntryValue::Deleted,
69+
LazyLookupValue::Eager(LookupValue::Slice { value }) => {
5570
if value.len() > MAX_SMALL_VALUE_SIZE {
5671
EntryValue::Medium { value }
5772
} else {
5873
EntryValue::Small { value }
5974
}
6075
}
61-
LookupValue::Blob { sequence_number } => EntryValue::Large {
76+
LazyLookupValue::Eager(LookupValue::Blob { sequence_number }) => EntryValue::Large {
6277
blob: *sequence_number,
6378
},
79+
LazyLookupValue::Medium {
80+
uncompressed_size,
81+
block,
82+
dictionary,
83+
} => EntryValue::MediumCompressed {
84+
uncompressed_size: *uncompressed_size,
85+
block,
86+
dictionary,
87+
},
6488
}
6589
}
6690
}

turbopack/crates/turbo-persistence/src/merge_iter.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,27 +6,27 @@ use crate::lookup_entry::LookupEntry;
66

77
/// An active iterator that is being merged. It has peeked the next element and can be compared
88
/// according to that element. The `order` is used when multiple iterators have the same key.
9-
struct ActiveIterator<T: Iterator<Item = Result<LookupEntry>>> {
9+
struct ActiveIterator<'l, T: Iterator<Item = Result<LookupEntry<'l>>>> {
1010
iter: T,
1111
order: usize,
12-
entry: LookupEntry,
12+
entry: LookupEntry<'l>,
1313
}
1414

15-
impl<T: Iterator<Item = Result<LookupEntry>>> PartialEq for ActiveIterator<T> {
15+
impl<'l, T: Iterator<Item = Result<LookupEntry<'l>>>> PartialEq for ActiveIterator<'l, T> {
1616
fn eq(&self, other: &Self) -> bool {
1717
self.entry.hash == other.entry.hash && *self.entry.key == *other.entry.key
1818
}
1919
}
2020

21-
impl<T: Iterator<Item = Result<LookupEntry>>> Eq for ActiveIterator<T> {}
21+
impl<'l, T: Iterator<Item = Result<LookupEntry<'l>>>> Eq for ActiveIterator<'l, T> {}
2222

23-
impl<T: Iterator<Item = Result<LookupEntry>>> PartialOrd for ActiveIterator<T> {
23+
impl<'l, T: Iterator<Item = Result<LookupEntry<'l>>>> PartialOrd for ActiveIterator<'l, T> {
2424
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2525
Some(self.cmp(other))
2626
}
2727
}
2828

29-
impl<T: Iterator<Item = Result<LookupEntry>>> Ord for ActiveIterator<T> {
29+
impl<'l, T: Iterator<Item = Result<LookupEntry<'l>>>> Ord for ActiveIterator<'l, T> {
3030
fn cmp(&self, other: &Self) -> Ordering {
3131
self.entry
3232
.hash
@@ -39,11 +39,11 @@ impl<T: Iterator<Item = Result<LookupEntry>>> Ord for ActiveIterator<T> {
3939

4040
/// An iterator that merges multiple sorted iterators into a single sorted iterator. Internal it
4141
/// uses an heap of iterators to iterate them in order.
42-
pub struct MergeIter<T: Iterator<Item = Result<LookupEntry>>> {
43-
heap: BinaryHeap<ActiveIterator<T>>,
42+
pub struct MergeIter<'l, T: Iterator<Item = Result<LookupEntry<'l>>>> {
43+
heap: BinaryHeap<ActiveIterator<'l, T>>,
4444
}
4545

46-
impl<T: Iterator<Item = Result<LookupEntry>>> MergeIter<T> {
46+
impl<'l, T: Iterator<Item = Result<LookupEntry<'l>>>> MergeIter<'l, T> {
4747
pub fn new(iters: impl Iterator<Item = T>) -> Result<Self> {
4848
let mut heap = BinaryHeap::new();
4949
for (order, mut iter) in iters.enumerate() {
@@ -56,8 +56,8 @@ impl<T: Iterator<Item = Result<LookupEntry>>> MergeIter<T> {
5656
}
5757
}
5858

59-
impl<T: Iterator<Item = Result<LookupEntry>>> Iterator for MergeIter<T> {
60-
type Item = Result<LookupEntry>;
59+
impl<'l, T: Iterator<Item = Result<LookupEntry<'l>>>> Iterator for MergeIter<'l, T> {
60+
type Item = Result<LookupEntry<'l>>;
6161

6262
fn next(&mut self) -> Option<Self::Item> {
6363
let ActiveIterator {

turbopack/crates/turbo-persistence/src/static_sorted_file.rs

Lines changed: 56 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use rustc_hash::FxHasher;
1818
use crate::{
1919
QueryKey,
2020
arc_slice::ArcSlice,
21-
lookup_entry::{LookupEntry, LookupValue},
21+
lookup_entry::{LazyLookupValue, LookupEntry, LookupValue},
2222
};
2323

2424
/// The block header for an index block.
@@ -339,6 +339,14 @@ impl StaticSortedFile {
339339

340340
/// Reads a block from the file.
341341
fn read_block(&self, block_index: u16, compression_dictionary: &[u8]) -> Result<ArcSlice<u8>> {
342+
let (uncompressed_length, block) = self.get_compressed_block(block_index)?;
343+
344+
let buffer = decompress_into_arc(uncompressed_length, block, compression_dictionary)?;
345+
Ok(ArcSlice::from(buffer))
346+
}
347+
348+
/// Gets the slice of the compressed block from the memory mapped file.
349+
fn get_compressed_block(&self, block_index: u16) -> Result<(u32, &[u8])> {
342350
#[cfg(feature = "strict_checks")]
343351
if block_index >= self.meta.block_count {
344352
bail!(
@@ -386,17 +394,9 @@ impl StaticSortedFile {
386394
self.meta.blocks_start()
387395
);
388396
}
389-
let uncompressed_length =
390-
(&self.mmap[block_start..block_start + 4]).read_u32::<BE>()? as usize;
391-
let block = self.mmap[block_start + 4..block_end].to_vec();
392-
393-
let buffer = Arc::new_zeroed_slice(uncompressed_length);
394-
// Safety: MaybeUninit<u8> can be safely transmuted to u8.
395-
let mut buffer = unsafe { transmute::<Arc<[MaybeUninit<u8>]>, Arc<[u8]>>(buffer) };
396-
// Safety: We know that the buffer is not shared yet.
397-
let decompressed = unsafe { Arc::get_mut_unchecked(&mut buffer) };
398-
decompress_with_dict(&block, decompressed, compression_dictionary)?;
399-
Ok(ArcSlice::from(buffer))
397+
let uncompressed_length = (&self.mmap[block_start..block_start + 4]).read_u32::<BE>()?;
398+
let block = &self.mmap[block_start + 4..block_end];
399+
Ok((uncompressed_length, block))
400400
}
401401
}
402402

@@ -423,15 +423,15 @@ struct CurrentIndexBlock {
423423
index: usize,
424424
}
425425

426-
impl Iterator for StaticSortedFileIter<'_> {
427-
type Item = Result<LookupEntry>;
426+
impl<'l> Iterator for StaticSortedFileIter<'l> {
427+
type Item = Result<LookupEntry<'l>>;
428428

429429
fn next(&mut self) -> Option<Self::Item> {
430430
self.next_internal().transpose()
431431
}
432432
}
433433

434-
impl StaticSortedFileIter<'_> {
434+
impl<'l> StaticSortedFileIter<'l> {
435435
/// Enters a block at the given index.
436436
fn enter_block(&mut self, block_index: u16) -> Result<()> {
437437
let block_arc = self.this.get_key_block(block_index, self.key_block_cache)?;
@@ -468,7 +468,7 @@ impl StaticSortedFileIter<'_> {
468468
}
469469

470470
/// Gets the next entry in the file and moves the cursor.
471-
fn next_internal(&mut self) -> Result<Option<LookupEntry>> {
471+
fn next_internal(&mut self) -> Result<Option<LookupEntry<'l>>> {
472472
loop {
473473
if let Some(CurrentKeyBlock {
474474
offsets,
@@ -479,9 +479,22 @@ impl StaticSortedFileIter<'_> {
479479
{
480480
let GetKeyEntryResult { hash, key, ty, val } =
481481
get_key_entry(&offsets, &entries, entry_count, index)?;
482-
let value = self
483-
.this
484-
.handle_key_match(ty, val, self.value_block_cache)?;
482+
let value = if ty == KEY_BLOCK_ENTRY_TYPE_MEDIUM {
483+
let mut val = val;
484+
let block = val.read_u16::<BE>()?;
485+
let (uncompressed_size, block) = self.this.get_compressed_block(block)?;
486+
LazyLookupValue::Medium {
487+
uncompressed_size,
488+
block,
489+
dictionary: &self.this.mmap
490+
[self.this.meta.value_compression_dictionary_range()],
491+
}
492+
} else {
493+
let value = self
494+
.this
495+
.handle_key_match(ty, val, self.value_block_cache)?;
496+
LazyLookupValue::Eager(value)
497+
};
485498
let entry = LookupEntry {
486499
hash,
487500
// Safety: The key is a valid slice of the entries.
@@ -573,3 +586,27 @@ fn get_key_entry<'l>(
573586
}
574587
})
575588
}
589+
590+
pub fn decompress_into_arc(
591+
uncompressed_length: u32,
592+
block: &[u8],
593+
compression_dictionary: &[u8],
594+
) -> Result<Arc<[u8]>> {
595+
// We directly allocate the buffer in an Arc to avoid copying it into an Arc and avoiding
596+
// double indirection. This is a dynamically sized arc.
597+
let buffer = Arc::new_zeroed_slice(uncompressed_length as usize);
598+
// Safety: MaybeUninit<u8> can be safely transmuted to u8.
599+
// Safety: decompress_with_dict will only write to `buffer` and not read from it.
600+
// Safety: We check that decompress_with_dict will write all bytes.
601+
let mut buffer = unsafe { transmute::<Arc<[MaybeUninit<u8>]>, Arc<[u8]>>(buffer) };
602+
// Safety: We know that the buffer is not shared yet.
603+
let decompressed = unsafe { Arc::get_mut_unchecked(&mut buffer) };
604+
// Safety: decompress_with_dict will only write to `decompressed` and not read from it.
605+
let bytes_writes = decompress_with_dict(&block, decompressed, compression_dictionary)?;
606+
assert_eq!(
607+
bytes_writes, uncompressed_length as usize,
608+
"Decompressed length does not match expected length"
609+
);
610+
// Safety: The buffer is now fully initialized and can be used.
611+
Ok(buffer)
612+
}

turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use lzzzz::lz4::{ACC_LEVEL_DEFAULT, max_compressed_size};
1212

1313
use crate::static_sorted_file::{
1414
BLOCK_TYPE_INDEX, BLOCK_TYPE_KEY, KEY_BLOCK_ENTRY_TYPE_BLOB, KEY_BLOCK_ENTRY_TYPE_DELETED,
15-
KEY_BLOCK_ENTRY_TYPE_MEDIUM, KEY_BLOCK_ENTRY_TYPE_SMALL,
15+
KEY_BLOCK_ENTRY_TYPE_MEDIUM, KEY_BLOCK_ENTRY_TYPE_SMALL, decompress_into_arc,
1616
};
1717

1818
/// The maximum number of entries that should go into a single key block
@@ -68,6 +68,12 @@ pub enum EntryValue<'l> {
6868
Small { value: &'l [u8] },
6969
/// Medium-sized value. They are stored in their own value block.
7070
Medium { value: &'l [u8] },
71+
/// Medium-sized value. They are stored in their own value block. Precompressed.
72+
MediumCompressed {
73+
uncompressed_size: u32,
74+
block: &'l [u8],
75+
dictionary: &'l [u8],
76+
},
7177
/// Large-sized value. They are stored in a blob file.
7278
Large { blob: u32 },
7379
/// Tombstone. The value was removed.
@@ -386,7 +392,18 @@ fn write_value_blocks(
386392
value_locations.push((block_index, 0));
387393
writer.write_value_block(value, value_compression_dictionary)?;
388394
}
389-
_ => {
395+
EntryValue::MediumCompressed {
396+
uncompressed_size,
397+
block,
398+
dictionary,
399+
} => {
400+
let block_index = writer.next_block_index();
401+
value_locations.push((block_index, 0));
402+
// Recompress block with a different dictionary
403+
let decompressed = decompress_into_arc(uncompressed_size, block, dictionary)?;
404+
writer.write_value_block(&decompressed, value_compression_dictionary)?;
405+
}
406+
EntryValue::Deleted | EntryValue::Large { .. } => {
390407
value_locations.push((0, 0));
391408
}
392409
}
@@ -438,7 +455,7 @@ fn write_key_blocks_and_compute_amqf(
438455
value.len().try_into().unwrap(),
439456
);
440457
}
441-
EntryValue::Medium { .. } => {
458+
EntryValue::Medium { .. } | EntryValue::MediumCompressed { .. } => {
442459
block.put_medium(entry, value_location.0);
443460
}
444461
EntryValue::Large { blob } => {

0 commit comments

Comments
 (0)