Skip to content

Commit 5e9a527

Browse files
fix: size calculation for zstd frame cache (#5859)
Co-authored-by: Hubert <hubert@chainsafe.io>
1 parent 19c11c8 commit 5e9a527

9 files changed

Lines changed: 160 additions & 41 deletions

File tree

.clippy.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,7 @@ reason = """use `crate::utils::new_uuid_v4` instead."""
4343
[[disallowed-methods]]
4444
path = "tempfile::NamedTempFile::new"
4545
reason = """The temporary files created by this method are not persistable if the temporary directory lives on a different filesystem than the target directory. While it is valid in other contexts (if not persisting files), it was misused many times and so we are banning it. Consider using `tempfile::NamedTempFile::new_in` or `tempfile::NamedTempFile::Builder"""
46+
47+
[[disallowed-methods]]
48+
path = "lru::LruCache::unbounded"
49+
reason = """Avoid unbounded lru cache for potential memory leak"""

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929

3030
### Added
3131

32+
- [#5859](https://github.com/ChainSafe/forest/pull/5859) Added size metrics for zstd frame cache and made max size configurable via `FOREST_ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE` environment variable.
33+
3234
### Changed
3335

3436
### Removed
@@ -37,6 +39,8 @@
3739

3840
- [#5863](https://github.com/ChainSafe/forest/pull/5863) Fixed needless GC runs on a stateless node.
3941

42+
- [#5859](https://github.com/ChainSafe/forest/pull/5859) Fixed size calculation for zstd frame cache.
43+
4044
## Forest v0.28.0 "Denethor's Folly"
4145

4246
This is a non-mandatory release recommended for all node operators. It includes numerous fixes and quality-of-life improvements for development and archival snapshot operations. It also includes a memory leak fix that would surface on long-running nodes.

docs/dictionary.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,3 +117,4 @@ V0
117117
V1
118118
VPS
119119
WIP
120+
zstd

docs/docs/users/reference/env_variables.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ process.
4949
| `FOREST_SNAPSHOT_GC_INTERVAL_EPOCHS` | non-negative integer | 20160 | 8000 | The interval in epochs for scheduling snapshot GC |
5050
| `FOREST_SNAPSHOT_GC_CHECK_INTERVAL_SECONDS` | non-negative integer | 300 | 60 | The interval in seconds for checking if snapshot GC should run |
5151
| `FOREST_DISABLE_BAD_BLOCK_CACHE` | 1 or true | empty | 1 | Whether or not to disable bad block cache |
52+
| `FOREST_ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE` | positive integer | 268435456 | 536870912 | The default zstd frame cache max size in bytes |
5253

5354
### `FOREST_F3_SIDECAR_FFI_BUILD_OPT_OUT`
5455

src/db/car/any.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use crate::blocks::{Tipset, TipsetKey};
1313
use crate::utils::io::EitherMmapOrRandomAccessFile;
1414
use cid::Cid;
1515
use fvm_ipld_blockstore::Blockstore;
16-
use parking_lot::Mutex;
1716
use positioned_io::ReadAt;
1817
use std::borrow::Cow;
1918
use std::io::{Error, ErrorKind, Result};
@@ -89,7 +88,7 @@ impl<ReaderT: RandomAccessFileReader> AnyCar<ReaderT> {
8988
}
9089

9190
/// Set the z-frame cache of the inner CAR reader.
92-
pub fn with_cache(self, cache: Arc<Mutex<ZstdFrameCache>>, key: CacheKey) -> Self {
91+
pub fn with_cache(self, cache: Arc<ZstdFrameCache>, key: CacheKey) -> Self {
9392
match self {
9493
AnyCar::Forest(f) => AnyCar::Forest(f.with_cache(cache, key)),
9594
AnyCar::Plain(p) => AnyCar::Plain(p),

src/db/car/forest.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ use futures::{Stream, TryStream, TryStreamExt as _};
6161
use fvm_ipld_blockstore::Blockstore;
6262
use fvm_ipld_encoding::to_vec;
6363
use nunny::Vec as NonEmpty;
64-
use parking_lot::Mutex;
6564
use positioned_io::{Cursor, ReadAt, ReadBytesAtExt, SizeCursor};
6665
use std::io::{Seek, SeekFrom};
6766
use std::path::Path;
@@ -94,7 +93,7 @@ pub struct ForestCar<ReaderT> {
9493
cache_key: CacheKey,
9594
indexed: index::Reader<positioned_io::Slice<ReaderT>>,
9695
index_size_bytes: u32,
97-
frame_cache: Arc<Mutex<ZstdFrameCache>>,
96+
frame_cache: Arc<ZstdFrameCache>,
9897
roots: NonEmpty<Cid>,
9998
}
10099

@@ -113,7 +112,7 @@ impl<ReaderT: super::RandomAccessFileReader> ForestCar<ReaderT> {
113112
cache_key: 0,
114113
indexed,
115114
index_size_bytes,
116-
frame_cache: Arc::new(Mutex::new(ZstdFrameCache::default())),
115+
frame_cache: Arc::new(ZstdFrameCache::default()),
117116
roots: header.roots,
118117
})
119118
}
@@ -179,7 +178,7 @@ impl<ReaderT: super::RandomAccessFileReader> ForestCar<ReaderT> {
179178
}
180179
}
181180

182-
pub fn with_cache(self, cache: Arc<Mutex<ZstdFrameCache>>, key: CacheKey) -> Self {
181+
pub fn with_cache(self, cache: Arc<ZstdFrameCache>, key: CacheKey) -> Self {
183182
Self {
184183
cache_key: key,
185184
frame_cache: cache,
@@ -203,7 +202,7 @@ where
203202
fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
204203
let indexed = &self.indexed;
205204
for position in indexed.get(*k)?.into_iter() {
206-
let cache_query = self.frame_cache.lock().get(position, self.cache_key, *k);
205+
let cache_query = self.frame_cache.get(position, self.cache_key, *k);
207206
match cache_query {
208207
// Frame cache hit, found value.
209208
Some(Some(val)) => return Ok(Some(val)),
@@ -220,12 +219,10 @@ where
220219
UviBytes::<Bytes>::default().decode_eof(&mut zstd_frame)?
221220
{
222221
let CarBlock { cid, data } = CarBlock::from_bytes(block_frame)?;
223-
block_map.insert(cid, data);
222+
block_map.insert(cid.into(), data);
224223
}
225-
let get_result = block_map.get(k).cloned();
226-
self.frame_cache
227-
.lock()
228-
.put(position, self.cache_key, block_map);
224+
let get_result = block_map.get(&(*k).into()).cloned();
225+
self.frame_cache.put(position, self.cache_key, block_map);
229226

230227
// This lookup only fails in case of a hash collision
231228
if let Some(value) = get_result {

src/db/car/many.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::{blocks::Tipset, libp2p_bitswap::BitswapStoreRead};
2323
use anyhow::Context as _;
2424
use cid::Cid;
2525
use fvm_ipld_blockstore::Blockstore;
26-
use parking_lot::{Mutex, RwLock};
26+
use parking_lot::RwLock;
2727
use std::cmp::Ord;
2828
use std::collections::BinaryHeap;
2929
use std::{path::PathBuf, sync::Arc};
@@ -64,15 +64,15 @@ impl PartialEq for WithHeaviestEpoch {
6464
}
6565

6666
pub struct ManyCar<WriterT = MemoryDB> {
67-
shared_cache: Arc<Mutex<ZstdFrameCache>>,
67+
shared_cache: Arc<ZstdFrameCache>,
6868
read_only: Arc<RwLock<BinaryHeap<WithHeaviestEpoch>>>,
6969
writer: WriterT,
7070
}
7171

7272
impl<WriterT> ManyCar<WriterT> {
7373
pub fn new(writer: WriterT) -> Self {
7474
ManyCar {
75-
shared_cache: Arc::new(Mutex::new(ZstdFrameCache::default())),
75+
shared_cache: Arc::new(ZstdFrameCache::default()),
7676
read_only: Arc::new(RwLock::new(BinaryHeap::default())),
7777
writer,
7878
}

src/db/car/mod.rs

Lines changed: 102 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,22 @@ pub mod plain;
77

88
pub use any::AnyCar;
99
pub use forest::ForestCar;
10+
use get_size2::GetSize as _;
1011
pub use many::ManyCar;
1112
pub use plain::PlainCar;
1213

1314
use ahash::HashMap;
1415
use cid::Cid;
15-
use lru::LruCache;
1616
use positioned_io::{ReadAt, Size};
17+
use std::{
18+
num::NonZeroUsize,
19+
sync::{
20+
LazyLock,
21+
atomic::{AtomicUsize, Ordering},
22+
},
23+
};
24+
25+
use crate::utils::{cache::SizeTrackingLruCache, get_size::CidWrapper};
1726

1827
pub trait RandomAccessFileReader: ReadAt + Size + Send + Sync + 'static {}
1928
impl<X: ReadAt + Size + Send + Sync + 'static> RandomAccessFileReader for X {}
@@ -24,55 +33,128 @@ pub type CacheKey = u64;
2433

2534
type FrameOffset = u64;
2635

36+
pub static ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE: LazyLock<usize> = LazyLock::new(|| {
37+
const ENV_KEY: &str = "FOREST_ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE";
38+
if let Ok(value) = std::env::var(ENV_KEY) {
39+
if let Ok(size) = value.parse::<NonZeroUsize>() {
40+
let size = size.get();
41+
tracing::info!("zstd frame max size is set to {size} via {ENV_KEY}");
42+
return size;
43+
} else {
44+
tracing::error!(
45+
"Failed to parse {ENV_KEY}={value}, value should be a positive integer"
46+
);
47+
}
48+
}
49+
// 256 MiB
50+
256 * 1024 * 1024
51+
});
52+
2753
pub struct ZstdFrameCache {
2854
/// Maximum size in bytes. Pages will be evicted if the total size of the
2955
/// cache exceeds this amount.
3056
pub max_size: usize,
31-
current_size: usize,
32-
lru: LruCache<(FrameOffset, CacheKey), HashMap<Cid, Vec<u8>>>,
57+
current_size: AtomicUsize,
58+
lru: SizeTrackingLruCache<(FrameOffset, CacheKey), HashMap<CidWrapper, Vec<u8>>>,
3359
}
3460

3561
impl Default for ZstdFrameCache {
3662
fn default() -> Self {
37-
ZstdFrameCache::new(ZstdFrameCache::DEFAULT_SIZE)
63+
ZstdFrameCache::new(*ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE)
3864
}
3965
}
4066

4167
impl ZstdFrameCache {
42-
// 1 GiB
43-
pub const DEFAULT_SIZE: usize = 1024 * 1024 * 1024;
44-
4568
pub fn new(max_size: usize) -> Self {
4669
ZstdFrameCache {
4770
max_size,
48-
current_size: 0,
49-
lru: LruCache::unbounded(),
71+
current_size: AtomicUsize::new(0),
72+
lru: SizeTrackingLruCache::unbounded_with_default_metrics_registry(
73+
"zstd_frame_cache".into(),
74+
),
5075
}
5176
}
5277

5378
/// Return a clone of the value associated with `cid`. If a value is found,
5479
/// the cache entry is moved to the top of the queue.
55-
pub fn get(&mut self, offset: FrameOffset, key: CacheKey, cid: Cid) -> Option<Option<Vec<u8>>> {
80+
pub fn get(&self, offset: FrameOffset, key: CacheKey, cid: Cid) -> Option<Option<Vec<u8>>> {
5681
self.lru
82+
.cache()
83+
.write()
5784
.get(&(offset, key))
58-
.map(|index| index.get(&cid).cloned())
85+
.map(|index| index.get(&cid.into()).cloned())
5986
}
6087

6188
/// Insert entry into lru-cache and evict pages if `max_size` has been exceeded.
62-
pub fn put(&mut self, offset: FrameOffset, key: CacheKey, index: HashMap<Cid, Vec<u8>>) {
63-
fn size_of_entry(entry: &HashMap<Cid, Vec<u8>>) -> usize {
64-
entry.values().map(Vec::len).sum::<usize>()
89+
pub fn put(&self, offset: FrameOffset, key: CacheKey, index: HashMap<CidWrapper, Vec<u8>>) {
90+
let lru_key = (offset, key);
91+
let lru_key_size = lru_key.get_size();
92+
let entry_size = index.get_size();
93+
// Skip large items
94+
if entry_size.saturating_add(lru_key_size) >= self.max_size {
95+
return;
6596
}
66-
self.current_size += size_of_entry(&index);
67-
if let Some(prev_entry) = self.lru.put((offset, key), index) {
68-
self.current_size -= size_of_entry(&prev_entry);
97+
98+
if let Some((_, prev_entry)) = self.lru.push(lru_key, index) {
99+
// keys are cancelled out
100+
self.current_size.fetch_add(entry_size, Ordering::Relaxed);
101+
self.current_size
102+
.fetch_sub(prev_entry.get_size(), Ordering::Relaxed);
103+
} else {
104+
self.current_size
105+
.fetch_add(entry_size.saturating_add(lru_key_size), Ordering::Relaxed);
69106
}
70-
while self.current_size > self.max_size {
71-
if let Some((_, entry)) = self.lru.pop_lru() {
72-
self.current_size -= size_of_entry(&entry)
107+
while self.current_size.load(Ordering::Relaxed) > self.max_size {
108+
if let Some((prev_key, prev_entry)) = self.lru.pop_lru() {
109+
self.current_size.fetch_sub(
110+
prev_key.get_size().saturating_add(prev_entry.get_size()),
111+
Ordering::Relaxed,
112+
);
73113
} else {
74114
break;
75115
}
76116
}
77117
}
78118
}
119+
120+
#[cfg(test)]
121+
mod tests {
122+
use super::*;
123+
use crate::utils::{multihash::MultihashCode, rand::forest_rng};
124+
use ahash::HashMap;
125+
use fvm_ipld_encoding::IPLD_RAW;
126+
use multihash_derive::MultihashDigest;
127+
use rand::Rng;
128+
129+
#[test]
130+
fn test_zstd_frame_cache_size() {
131+
let mut rng = forest_rng();
132+
let cache = ZstdFrameCache::new(10);
133+
for i in 0..100 {
134+
let index = gen_index(&mut rng);
135+
cache.put(i, i, index);
136+
assert_eq!(
137+
cache.current_size.load(Ordering::Relaxed),
138+
cache.lru.size_in_bytes()
139+
);
140+
let index2 = gen_index(&mut rng);
141+
cache.put(i, i, index2);
142+
assert_eq!(
143+
cache.current_size.load(Ordering::Relaxed),
144+
cache.lru.size_in_bytes()
145+
);
146+
}
147+
}
148+
149+
fn gen_index(rng: &mut impl Rng) -> HashMap<CidWrapper, Vec<u8>> {
150+
let mut map = HashMap::default();
151+
for _ in 0..10 {
152+
let vec_len = rng.gen_range(64..1024);
153+
let mut data = vec![0; vec_len];
154+
rng.fill_bytes(&mut data);
155+
let cid = Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data));
156+
map.insert(cid.into(), data);
157+
}
158+
map
159+
}
160+
}

src/utils/cache/lru.rs

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,19 +59,29 @@ where
5959
registry.register_collector(Box::new(self.clone()));
6060
}
6161

62-
pub fn new_without_metrics_registry(
63-
cache_name: Cow<'static, str>,
64-
capacity: NonZeroUsize,
65-
) -> Self {
62+
fn new_inner(cache_name: Cow<'static, str>, capacity: Option<NonZeroUsize>) -> Self {
6663
static ID_GENERATOR: AtomicUsize = AtomicUsize::new(0);
6764

6865
Self {
6966
cache_id: ID_GENERATOR.fetch_add(1, Ordering::Relaxed),
7067
cache_name,
71-
cache: Arc::new(RwLock::new(LruCache::new(capacity))),
68+
#[allow(clippy::disallowed_methods)]
69+
cache: Arc::new(RwLock::new(
70+
capacity
71+
.map(LruCache::new)
72+
// For constructing lru cache that is bounded by memory usage instead of length
73+
.unwrap_or_else(LruCache::unbounded),
74+
)),
7275
}
7376
}
7477

78+
pub fn new_without_metrics_registry(
79+
cache_name: Cow<'static, str>,
80+
capacity: NonZeroUsize,
81+
) -> Self {
82+
Self::new_inner(cache_name, Some(capacity))
83+
}
84+
7585
pub fn new_with_metrics_registry(
7686
cache_name: Cow<'static, str>,
7787
capacity: NonZeroUsize,
@@ -89,6 +99,23 @@ where
8999
Self::new_with_metrics_registry(cache_name, capacity, &mut default_registry())
90100
}
91101

102+
pub fn unbounded_without_metrics_registry(cache_name: Cow<'static, str>) -> Self {
103+
Self::new_inner(cache_name, None)
104+
}
105+
106+
pub fn unbounded_with_metrics_registry(
107+
cache_name: Cow<'static, str>,
108+
metrics_registry: &mut Registry,
109+
) -> Self {
110+
let c = Self::unbounded_without_metrics_registry(cache_name);
111+
c.register_metrics(metrics_registry);
112+
c
113+
}
114+
115+
pub fn unbounded_with_default_metrics_registry(cache_name: Cow<'static, str>) -> Self {
116+
Self::unbounded_with_metrics_registry(cache_name, &mut default_registry())
117+
}
118+
92119
pub fn cache(&self) -> &Arc<RwLock<LruCache<K, V>>> {
93120
&self.cache
94121
}
@@ -113,6 +140,10 @@ where
113140
self.cache.read().peek(k).cloned()
114141
}
115142

143+
pub fn pop_lru(&self) -> Option<(K, V)> {
144+
self.cache.write().pop_lru()
145+
}
146+
116147
pub fn len(&self) -> usize {
117148
self.cache.read().len()
118149
}
@@ -121,7 +152,7 @@ where
121152
self.cache.read().cap().get()
122153
}
123154

124-
fn size_in_bytes(&self) -> usize {
155+
pub(crate) fn size_in_bytes(&self) -> usize {
125156
let mut size = 0_usize;
126157
for (k, v) in self.cache.read().iter() {
127158
size = size

0 commit comments

Comments
 (0)