Skip to content

Heap profiler with leak tracking #5763

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ assert-json-diff = "2"
async-compression = { version = "0.4", features = ["tokio", "gzip"] }
async-speed-limit = "0.4"
async-trait = "0.1"
backtrace = "0.3"
base64 = "0.22"
binggan = { version = "0.14" }
bytes = { version = "1", features = ["serde"] }
Expand Down
8 changes: 8 additions & 0 deletions quickwit/quickwit-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ quickwit-storage = { workspace = true, features = ["testsuite"] }

[features]
jemalloc = ["dep:tikv-jemalloc-ctl", "dep:tikv-jemallocator"]
jemalloc-profiled = [
"quickwit-common/jemalloc-profiled",
"quickwit-serve/jemalloc-profiled"
]
ci-test = []
pprof = ["quickwit-serve/pprof"]
openssl-support = ["openssl-probe"]
Expand Down Expand Up @@ -127,6 +131,10 @@ release-macos-feature-vendored-set = [
"quickwit-metastore/postgres",
"quickwit-doc-mapper/multilang",
]
release-heap-profiled = [
"release-feature-set",
"jemalloc-profiled"
]

[package.metadata.cargo-machete]
# used to enable the `multilang` feature
Expand Down
4 changes: 4 additions & 0 deletions quickwit/quickwit-cli/src/jemalloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ use tikv_jemallocator::Jemalloc;
use tracing::error;

#[global_allocator]
#[cfg(feature = "jemalloc-profiled")]
pub static GLOBAL: quickwit_common::jemalloc_profiled::JemallocProfiled =
quickwit_common::jemalloc_profiled::JemallocProfiled(Jemalloc);
#[cfg(not(feature = "jemalloc-profiled"))]
pub static GLOBAL: Jemalloc = Jemalloc;

const JEMALLOC_METRICS_POLLING_INTERVAL: Duration = Duration::from_secs(1);
Expand Down
9 changes: 9 additions & 0 deletions quickwit/quickwit-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ license.workspace = true
anyhow = { workspace = true }
async-speed-limit = { workspace = true }
async-trait = { workspace = true }
backtrace = { workspace = true, optional = true }
bytesize = { workspace = true }
coarsetime = { workspace = true }
dyn-clone = { workspace = true }
Expand All @@ -37,6 +38,8 @@ siphasher = { workspace = true }
sysinfo = { workspace = true }
tempfile = { workspace = true }
thiserror = { workspace = true }
tikv-jemallocator = { workspace = true, optional = true }
tikv-jemalloc-ctl = { workspace = true, optional = true }
tokio = { workspace = true }
tokio-metrics = { workspace = true }
tokio-stream = { workspace = true }
Expand All @@ -47,9 +50,15 @@ tracing = { workspace = true }
[features]
testsuite = []
named_tasks = ["tokio/tracing"]
jemalloc-profiled = [
"dep:backtrace",
"dep:tikv-jemallocator",
"dep:tikv-jemalloc-ctl"
]

[dev-dependencies]
serde_json = { workspace = true }
tempfile = { workspace = true }
proptest = { workspace = true }
serial_test = { workspace = true }
tokio = { workspace = true, features = ["test-util"] }
236 changes: 236 additions & 0 deletions quickwit/quickwit-common/src/alloc_tracker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
// Copyright 2021-Present Datadog, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::sync::Mutex;

use bytesize::ByteSize;
use once_cell::sync::Lazy;

static ALLOCATION_TRACKER: Lazy<Mutex<Allocations>> =
Lazy::new(|| Mutex::new(Allocations::default()));

#[derive(Debug)]
struct Allocation {
pub callsite_hash: u64,
pub size: ByteSize,
}

#[derive(Debug, Copy, Clone)]
pub struct Statistic {
pub count: u64,
pub size: ByteSize,
pub last_report: ByteSize,
}

#[derive(Debug)]
enum Status {
Started { reporting_interval: ByteSize },
Stopped,
}

/// WARN:
/// - keys and values in these maps should not allocate!
/// - we assume HashMaps don't allocate if their capacity is not exceeded
#[derive(Debug)]
struct Allocations {
memory_locations: HashMap<usize, Allocation>,
callsite_statistics: HashMap<u64, Statistic>,
status: Status,
}

impl Default for Allocations {
fn default() -> Self {
Self {
memory_locations: HashMap::with_capacity(128 * 1024),
callsite_statistics: HashMap::with_capacity(32 * 1024),
status: Status::Stopped,
}
}
}

pub fn init(reporting_interval_bytes: u64) {
let mut guard = ALLOCATION_TRACKER.lock().unwrap();
guard.memory_locations.clear();
guard.callsite_statistics.clear();
guard.status = Status::Started {
reporting_interval: ByteSize(reporting_interval_bytes),
}
}

pub enum AllocRecordingResponse {
ThresholdExceeded(Statistic),
ThresholdNotExceeded,
TrackerFull(&'static str),
NotStarted,
}

/// Records an allocation and occasionally reports the cumulated allocation size
/// for the provided callsite_hash.
///
/// Every time a the total allocated size with the same callsite_hash
/// exceeds the previous reported value by at least reporting_interval, that
/// allocated size is reported.
///
/// WARN: this function should not allocate!
pub fn record_allocation(
callsite_hash: u64,
size_bytes: u64,
ptr: *mut u8,
) -> AllocRecordingResponse {
let mut guard = ALLOCATION_TRACKER.lock().unwrap();
let Status::Started { reporting_interval } = guard.status else {
return AllocRecordingResponse::NotStarted;
};
if guard.memory_locations.capacity() == guard.memory_locations.len() {
return AllocRecordingResponse::TrackerFull("memory_locations");
}
if guard.callsite_statistics.capacity() == guard.callsite_statistics.len() {
return AllocRecordingResponse::TrackerFull("memory_locations");
}
guard.memory_locations.insert(
ptr as usize,
Allocation {
callsite_hash,
size: ByteSize(size_bytes),
},
);
let entry = guard
.callsite_statistics
.entry(callsite_hash)
.and_modify(|stat| {
stat.count += 1;
stat.size += size_bytes;
})
.or_insert(Statistic {
count: 1,
size: ByteSize(size_bytes),
last_report: ByteSize(0),
});
let new_threshold_exceeded = entry.size > (entry.last_report + reporting_interval);
if new_threshold_exceeded {
let reported_statistic = *entry;
entry.last_report = entry.size;
AllocRecordingResponse::ThresholdExceeded(reported_statistic)
} else {
AllocRecordingResponse::ThresholdNotExceeded
}
}

/// WARN: this function should not allocate!
pub fn record_deallocation(ptr: *mut u8) {
let mut guard = ALLOCATION_TRACKER.lock().unwrap();
if let Status::Stopped = guard.status {
return;
}
let Some(Allocation {
size,
callsite_hash,
..
}) = guard.memory_locations.remove(&(ptr as usize))
else {
// this was allocated before the tracking started
return;
};
if let Entry::Occupied(mut content) = guard.callsite_statistics.entry(callsite_hash) {
let new_size_bytes = content.get().size.0.saturating_sub(size.0);
let new_count = content.get().count.saturating_sub(1);
content.get_mut().count = new_count;
content.get_mut().size = ByteSize(new_size_bytes);
if content.get().count == 0 {
content.remove();
}
}
}

#[cfg(test)]
mod tests {

use super::*;

#[test]
#[serial_test::file_serial]
fn test_record_allocation_and_deallocation() {
init(2000);
let callsite_hash_1 = 777;

let ptr_1 = 0x1 as *mut u8;
let response = record_allocation(callsite_hash_1, 1500, ptr_1);
assert!(matches!(
response,
AllocRecordingResponse::ThresholdNotExceeded
));

let ptr_2 = 0x2 as *mut u8;
let response = record_allocation(callsite_hash_1, 1500, ptr_2);
let AllocRecordingResponse::ThresholdExceeded(statistic) = response else {
panic!("Expected ThresholdExceeded response");
};
assert_eq!(statistic.count, 2);
assert_eq!(statistic.size, ByteSize(3000));
assert_eq!(statistic.last_report, ByteSize(0));

record_deallocation(ptr_2);

// the threshold was already crossed
let ptr_3 = 0x3 as *mut u8;
let response = record_allocation(callsite_hash_1, 1500, ptr_3);
assert!(matches!(
response,
AllocRecordingResponse::ThresholdNotExceeded
));

// this is a brand new call site with different statistics
let callsite_hash_2 = 42;
let ptr_3 = 0x3 as *mut u8;
let response = record_allocation(callsite_hash_2, 1500, ptr_3);
assert!(matches!(
response,
AllocRecordingResponse::ThresholdNotExceeded
));
}

#[test]
#[serial_test::file_serial]
fn test_tracker_full() {
init(1024 * 1024 * 1024);
let memory_locations_capacity = ALLOCATION_TRACKER
.lock()
.unwrap()
.memory_locations
.capacity();

for i in 0..memory_locations_capacity {
let ptr = (i + 1) as *mut u8;
let response = record_allocation(777, 10, ptr);
assert!(matches!(
response,
AllocRecordingResponse::ThresholdNotExceeded
));
}
let response = record_allocation(777, 10, (memory_locations_capacity + 1) as *mut u8);
assert!(matches!(
response,
AllocRecordingResponse::TrackerFull("memory_locations")
));
// make sure that the map didn't grow
let current_memory_locations_capacity = ALLOCATION_TRACKER
.lock()
.unwrap()
.memory_locations
.capacity();
assert_eq!(current_memory_locations_capacity, memory_locations_capacity);
}
}
Loading