diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index c0b878ecaca..af7b2b0a676 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -6585,6 +6585,7 @@ dependencies = [ "anyhow", "async-speed-limit", "async-trait", + "backtrace", "bytesize", "coarsetime", "dyn-clone", @@ -6610,6 +6611,8 @@ dependencies = [ "sysinfo", "tempfile", "thiserror 1.0.69", + "tikv-jemalloc-ctl", + "tikv-jemallocator", "tokio", "tokio-metrics", "tokio-stream", @@ -7203,6 +7206,7 @@ dependencies = [ "tantivy", "tantivy-fst", "thiserror 1.0.69", + "tikv-jemalloc-ctl", "tokio", "tokio-stream", "tower 0.4.13", diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 769591e457e..1af321c3300 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -85,6 +85,7 @@ assert-json-diff = "2" async-compression = { version = "0.4", features = ["tokio", "gzip"] } async-speed-limit = "0.4" async-trait = "0.1" +backtrace = "0.3" base64 = "0.22" binggan = { version = "0.14" } bytes = { version = "1", features = ["serde"] } diff --git a/quickwit/quickwit-cli/Cargo.toml b/quickwit/quickwit-cli/Cargo.toml index 41a8fdce5e0..524e77fc8fd 100644 --- a/quickwit/quickwit-cli/Cargo.toml +++ b/quickwit/quickwit-cli/Cargo.toml @@ -81,6 +81,10 @@ quickwit-storage = { workspace = true, features = ["testsuite"] } [features] jemalloc = ["dep:tikv-jemalloc-ctl", "dep:tikv-jemallocator"] +jemalloc-profiled = [ + "quickwit-common/jemalloc-profiled", + "quickwit-serve/jemalloc-profiled" +] ci-test = [] pprof = ["quickwit-serve/pprof"] openssl-support = ["openssl-probe"] @@ -127,6 +131,10 @@ release-macos-feature-vendored-set = [ "quickwit-metastore/postgres", "quickwit-doc-mapper/multilang", ] +release-heap-profiled = [ + "release-feature-set", + "jemalloc-profiled" +] [package.metadata.cargo-machete] # used to enable the `multilang` feature diff --git a/quickwit/quickwit-cli/src/jemalloc.rs b/quickwit/quickwit-cli/src/jemalloc.rs index 71969f79909..e5223e5ee31 100644 --- a/quickwit/quickwit-cli/src/jemalloc.rs +++ b/quickwit/quickwit-cli/src/jemalloc.rs @@ -19,6 +19,10 @@ use tikv_jemallocator::Jemalloc; use tracing::error; #[global_allocator] +#[cfg(feature = "jemalloc-profiled")] +pub static GLOBAL: quickwit_common::jemalloc_profiled::JemallocProfiled = + quickwit_common::jemalloc_profiled::JemallocProfiled(Jemalloc); +#[cfg(not(feature = "jemalloc-profiled"))] pub static GLOBAL: Jemalloc = Jemalloc; const JEMALLOC_METRICS_POLLING_INTERVAL: Duration = Duration::from_secs(1); diff --git a/quickwit/quickwit-common/Cargo.toml b/quickwit/quickwit-common/Cargo.toml index 0cc409e38cb..576f2bb90ba 100644 --- a/quickwit/quickwit-common/Cargo.toml +++ b/quickwit/quickwit-common/Cargo.toml @@ -14,6 +14,7 @@ license.workspace = true anyhow = { workspace = true } async-speed-limit = { workspace = true } async-trait = { workspace = true } +backtrace = { workspace = true, optional = true } bytesize = { workspace = true } coarsetime = { workspace = true } dyn-clone = { workspace = true } @@ -37,6 +38,8 @@ siphasher = { workspace = true } sysinfo = { workspace = true } tempfile = { workspace = true } thiserror = { workspace = true } +tikv-jemallocator = { workspace = true, optional = true } +tikv-jemalloc-ctl = { workspace = true, optional = true } tokio = { workspace = true } tokio-metrics = { workspace = true } tokio-stream = { workspace = true } @@ -47,6 +50,11 @@ tracing = { workspace = true } [features] testsuite = [] named_tasks = ["tokio/tracing"] +jemalloc-profiled = [ + "dep:backtrace", + "dep:tikv-jemallocator", + "dep:tikv-jemalloc-ctl" +] [dev-dependencies] serde_json = { workspace = true } diff --git a/quickwit/quickwit-common/src/jemalloc_profiled.rs b/quickwit/quickwit-common/src/jemalloc_profiled.rs new file mode 100644 index 00000000000..4f4e4d06269 --- /dev/null +++ b/quickwit/quickwit-common/src/jemalloc_profiled.rs @@ -0,0 +1,330 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::alloc::{GlobalAlloc, Layout}; +use std::sync::atomic::{AtomicBool, AtomicI64, AtomicUsize, Ordering}; + +use serde::Serialize; +use tikv_jemallocator::Jemalloc; +use tracing::info; + +pub const DEFAULT_MIN_ALLOC_SIZE_FOR_BACKTRACE: usize = 256 * 1024; + +// commands +static MIN_ALLOC_SIZE_FOR_BACKTRACE: AtomicUsize = + AtomicUsize::new(DEFAULT_MIN_ALLOC_SIZE_FOR_BACKTRACE); +static ENABLED: AtomicBool = AtomicBool::new(false); + +// stats, actual allocations might be larger for alignment reasons +static ALLOC_COUNT: AtomicUsize = AtomicUsize::new(0); +static ALLOC_ZEROED_COUNT: AtomicUsize = AtomicUsize::new(0); +static ALLOC_SIZE: AtomicUsize = AtomicUsize::new(0); +/// total size of allocations large enough to be printed +static ALLOC_PRINTED_SIZE: AtomicUsize = AtomicUsize::new(0); +static DEALLOC_COUNT: AtomicUsize = AtomicUsize::new(0); +static DEALLOC_SIZE: AtomicUsize = AtomicUsize::new(0); +static REALLOC_COUNT: AtomicUsize = AtomicUsize::new(0); +static REALLOC_GROW_SIZE: AtomicUsize = AtomicUsize::new(0); +static REALLOC_SHRINK_SIZE: AtomicUsize = AtomicUsize::new(0); +/// total size of reallocations large enough to be printed +static REALLOC_PRINTED_SIZE: AtomicI64 = AtomicI64::new(0); + +#[derive(Debug, Serialize)] +pub struct JemallocProfile { + pub alloc_count: usize, + pub alloc_size: usize, + pub alloc_printed_size: usize, + pub alloc_zeroed_count: usize, + pub dealloc_count: usize, + pub dealloc_size: usize, + pub realloc_count: usize, + pub realloc_shrink_size: usize, + pub realloc_grow_size: usize, + pub realloc_printed_size: i64, +} + +#[derive(Debug, thiserror::Error)] +#[error("profiling unavailable")] +pub struct Unavailable; + +pub fn start_profiling(min_alloc_size_for_backtrace: usize) -> Result<(), Unavailable> { + warmup_symbol_cache(); + configure_min_alloc_size_for_backtrace(min_alloc_size_for_backtrace); + info!(min_alloc_size_for_backtrace, "start heap profiling"); + let profiling_previously_enabled = ENABLED.swap(true, Ordering::Acquire); + if profiling_previously_enabled { + info!("heap profiling already running"); + return Err(Unavailable); + } + ALLOC_COUNT.store(0, Ordering::SeqCst); + ALLOC_SIZE.store(0, Ordering::SeqCst); + ALLOC_ZEROED_COUNT.store(0, Ordering::SeqCst); + DEALLOC_COUNT.store(0, Ordering::SeqCst); + DEALLOC_SIZE.store(0, Ordering::SeqCst); + REALLOC_COUNT.store(0, Ordering::SeqCst); + REALLOC_GROW_SIZE.store(0, Ordering::SeqCst); + REALLOC_SHRINK_SIZE.store(0, Ordering::SeqCst); + Ok(()) +} + +pub fn stop_profiling() -> Result { + let profiling_previously_enabled = ENABLED.swap(false, Ordering::Release); + if !profiling_previously_enabled { + return Err(Unavailable); + } + info!("end heap profiling"); + backtrace::clear_symbol_cache(); + Ok(JemallocProfile { + alloc_count: ALLOC_COUNT.load(Ordering::SeqCst), + alloc_size: ALLOC_SIZE.load(Ordering::SeqCst), + alloc_printed_size: ALLOC_PRINTED_SIZE.load(Ordering::SeqCst), + alloc_zeroed_count: ALLOC_ZEROED_COUNT.load(Ordering::SeqCst), + dealloc_count: DEALLOC_COUNT.load(Ordering::SeqCst), + dealloc_size: DEALLOC_SIZE.load(Ordering::SeqCst), + realloc_count: REALLOC_COUNT.load(Ordering::SeqCst), + realloc_grow_size: REALLOC_GROW_SIZE.load(Ordering::SeqCst), + realloc_shrink_size: REALLOC_SHRINK_SIZE.load(Ordering::SeqCst), + realloc_printed_size: REALLOC_PRINTED_SIZE.load(Ordering::SeqCst), + }) +} + +fn configure_min_alloc_size_for_backtrace(min_alloc_size_for_backtrace: usize) { + MIN_ALLOC_SIZE_FOR_BACKTRACE.store(min_alloc_size_for_backtrace, Ordering::Relaxed); +} + +/// Calls backtrace once to warmup symbolization allocations (~30MB) +fn warmup_symbol_cache() { + backtrace::trace(|frame| { + backtrace::resolve_frame(frame, |_| {}); + true + }); +} + +/// Wraps Jemalloc global allocator calls with tracking routines. +/// +/// The tracking routines are called only when [JEMALLOC_PROFILED_ENABLED] +/// is set to true, but we don't enforce any synchronization (we load it +/// with Ordering::Relaxed) because it's fine to miss or record extra +/// allocation events. +pub struct JemallocProfiled(pub Jemalloc); + +unsafe impl GlobalAlloc for JemallocProfiled { + #[inline] + unsafe fn alloc(&self, layout: Layout) -> *mut u8 { + if ENABLED.load(Ordering::Relaxed) { + track_alloc_call(layout); + } + self.0.alloc(layout) + } + + #[inline] + unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 { + if ENABLED.load(Ordering::Relaxed) { + track_alloc_zeroed_call(layout); + } + self.0.alloc_zeroed(layout) + } + + #[inline] + unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { + if ENABLED.load(Ordering::Relaxed) { + track_dealloc_call(layout); + } + self.0.dealloc(ptr, layout) + } + + #[inline] + unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 { + if ENABLED.load(Ordering::Relaxed) { + track_realloc_call(layout, new_size); + } + self.0.realloc(ptr, layout, new_size) + } +} + +#[cold] +fn track_alloc_call(layout: Layout) { + if layout.size() > MIN_ALLOC_SIZE_FOR_BACKTRACE.load(Ordering::Relaxed) { + ALLOC_PRINTED_SIZE.fetch_add(layout.size(), Ordering::Relaxed); + dump_trace("alloc", layout.size()); + } + ALLOC_COUNT.fetch_add(1, Ordering::Relaxed); + ALLOC_SIZE.fetch_add(layout.size(), Ordering::Relaxed); +} + +#[cold] +fn track_alloc_zeroed_call(layout: Layout) { + ALLOC_ZEROED_COUNT.fetch_add(1, Ordering::Relaxed); + ALLOC_SIZE.fetch_add(layout.size(), Ordering::Relaxed); +} + +#[cold] +fn track_dealloc_call(layout: Layout) { + DEALLOC_COUNT.fetch_add(1, Ordering::Relaxed); + DEALLOC_SIZE.fetch_add(layout.size(), Ordering::Relaxed); +} + +#[cold] +fn track_realloc_call(current_layout: Layout, new_size: usize) { + let alloc_delta = new_size as i64 - current_layout.size() as i64; + if alloc_delta > MIN_ALLOC_SIZE_FOR_BACKTRACE.load(Ordering::Relaxed) as i64 { + REALLOC_PRINTED_SIZE.fetch_add(alloc_delta, Ordering::Relaxed); + dump_trace("realloc", alloc_delta as usize); + } + REALLOC_COUNT.fetch_add(1, Ordering::Relaxed); + if alloc_delta > 0 { + REALLOC_GROW_SIZE.fetch_add(alloc_delta as usize, Ordering::Relaxed); + } else { + REALLOC_SHRINK_SIZE.fetch_add(-alloc_delta as usize, Ordering::Relaxed); + } +} + +/// Calling this function when allocating can trigger a recursive call (and +/// stack overflow) if [backtrace] is first called after the tracking starts. +/// Indeed, [backtrace] grows a pretty big allocation to cache all the symbols +/// (somewhere in backtrace-rs/symbolize/gimli/macho.rs). +fn dump_trace(kind: &'static str, alloc_size: usize) { + let mut profiling_frames_skipped = false; + let mut remaining_frames_to_inspect = 100; + let mut logged = false; + backtrace::trace(|frame| { + if remaining_frames_to_inspect == 0 { + println!("{},{},", kind, alloc_size); + logged = true; + return false; + } else { + remaining_frames_to_inspect = -1; + } + + if !profiling_frames_skipped { + backtrace::resolve_frame(frame, |symbol| { + if let Some(symbol_name) = symbol.name() { + if prefix_helper::is_prefix("quickwit_common::jemalloc_profiled", &symbol_name) + { + profiling_frames_skipped = true; + } + } + }); + return true; + } + let mut keep_going = true; + backtrace::resolve_frame(frame, |symbol| { + if let Some(symbol_name) = symbol.name() { + for prefix in ["quickwit", "chitchat", "tantivy"].iter() { + if prefix_helper::is_prefix(prefix, &symbol_name) { + println!("{},{},{}", kind, alloc_size, symbol_name); + logged = true; + keep_going = false; + return; + } + } + } + }); + keep_going + }); + if !logged { + println!("{},", kind); + } +} + +/// Helper that enables checking prefix matches on types implementing Display +/// (e.g SymbolName) without any extra allocation. +/// +/// The matcher also discards the first character of the string if it is a '<'. +mod prefix_helper { + use std::fmt::{Display, Write}; + + pub fn is_prefix(prefix: &'static str, value: impl Display) -> bool { + let mut comp_write = PrefixWriteMatcher::new(prefix); + write!(&mut comp_write, "{}", value).unwrap(); + comp_write.matched() + } + + struct PrefixWriteMatcher { + prefix: &'static str, + chars_matched: usize, + failed: bool, + } + + impl PrefixWriteMatcher { + fn new(prefix: &'static str) -> Self { + Self { + prefix, + chars_matched: 0, + failed: false, + } + } + + fn matched(&self) -> bool { + self.chars_matched == self.prefix.len() + } + } + + impl Write for PrefixWriteMatcher { + fn write_str(&mut self, mut new_slice: &str) -> std::fmt::Result { + if self.matched() || self.failed { + return Ok(()); + } + if new_slice.starts_with('<') && self.chars_matched == 0 { + new_slice = &new_slice[1..]; + } + let max_idx = (self.chars_matched + new_slice.len()).min(self.prefix.len()); + let matched_prefix_slice = &self.prefix[self.chars_matched..max_idx]; + if matched_prefix_slice != &new_slice[..matched_prefix_slice.len()] { + self.failed = true; + } else { + self.chars_matched += matched_prefix_slice.len(); + } + Ok(()) + } + } + + #[cfg(test)] + mod tests { + use itertools::Itertools; + + use super::*; + + #[test] + fn test_comparison_write() { + assert!(!is_prefix("hello", "")); + assert!(is_prefix("hello", "hello")); + assert!(is_prefix("hello", "hello world")); + assert!(!is_prefix("hello", "hell")); + assert!(!is_prefix("hello", ["h", "e"].iter().format(""))); + assert!(is_prefix( + "hello", + ["h", "e", "l", "l", "o"].iter().format("") + )); + assert!(is_prefix( + "hello", + ["h", "e", "l", "l", "o", "!"].iter().format("") + )); + assert!(!is_prefix("hello", ["h", "i"].iter().format(""))); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_profiling() { + start_profiling(1024).unwrap(); + start_profiling(2048).unwrap_err(); + stop_profiling().unwrap(); + stop_profiling().unwrap_err(); + } +} diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index 67ea089f8f1..511ed2729ee 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -19,6 +19,8 @@ mod coolid; pub mod binary_heap; pub mod fs; pub mod io; +#[cfg(feature = "jemalloc-profiled")] +pub mod jemalloc_profiled; mod kill_switch; pub mod metrics; pub mod net; diff --git a/quickwit/quickwit-search/Cargo.toml b/quickwit/quickwit-search/Cargo.toml index 820d3d49891..0d0173a134d 100644 --- a/quickwit/quickwit-search/Cargo.toml +++ b/quickwit/quickwit-search/Cargo.toml @@ -31,6 +31,7 @@ serde_json_borrow = { workspace = true } tantivy = { workspace = true } tantivy-fst = { workspace = true } thiserror = { workspace = true } +tikv-jemalloc-ctl = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } tower = { workspace = true } diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 6b641f8364a..250d42fc26e 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -432,6 +432,56 @@ fn compute_index_size(hot_directory: &HotDirectory) -> ByteSize { ByteSize(size_bytes) } +fn print_jemalloc(callsite: &'static str) { + let epoch_mib = tikv_jemalloc_ctl::epoch::mib().unwrap(); + let resident_mib = tikv_jemalloc_ctl::stats::resident::mib().unwrap(); + let allocated_mib = tikv_jemalloc_ctl::stats::allocated::mib().unwrap(); + epoch_mib.advance().unwrap(); + println!( + "(dbg) {} [res={}MB,alloc={}MB]", + callsite, + resident_mib.read().unwrap() / 1024 / 1024, + allocated_mib.read().unwrap() / 1024 / 1024, + ); +} + +fn print_caches(callsite: &'static str) { + println!( + "(dbg) {} shortlived({:?}/{:?}MB) fastfield({:?}/{:?}MB) splitfooter({:?}/{:?}MB)", + callsite, + quickwit_storage::STORAGE_METRICS + .shortlived_cache + .in_cache_count + .get(), + quickwit_storage::STORAGE_METRICS + .shortlived_cache + .in_cache_num_bytes + .get() + / 1024 + / 1024, + quickwit_storage::STORAGE_METRICS + .fast_field_cache + .in_cache_count + .get(), + quickwit_storage::STORAGE_METRICS + .fast_field_cache + .in_cache_num_bytes + .get() + / 1024 + / 1024, + quickwit_storage::STORAGE_METRICS + .split_footer_cache + .in_cache_count + .get(), + quickwit_storage::STORAGE_METRICS + .split_footer_cache + .in_cache_num_bytes + .get() + / 1024 + / 1024 + ); +} + /// Apply a leaf search on a single split. #[allow(clippy::too_many_arguments)] async fn leaf_search_single_split( @@ -502,6 +552,13 @@ async fn leaf_search_single_split( let warmup_start = Instant::now(); warmup(&searcher, &warmup_info).await?; + print_jemalloc("after warmup"); + // println!( + // "(dbg) shortlived cache totalling {}MB", + // byte_range_cache.get_num_bytes() / 1024 / 1024 + // ); + print_caches("after_warmup"); + let warmup_end = Instant::now(); let warmup_duration: Duration = warmup_end.duration_since(warmup_start); let warmup_size = ByteSize(byte_range_cache.get_num_bytes()); @@ -1333,6 +1390,9 @@ pub async fn leaf_search( doc_mapper: Arc, aggregations_limits: AggregationLimitsGuard, ) -> Result { + println!("index_storage={index_storage:?}"); + print_jemalloc("leaf_search start"); + print_caches("leaf_search start"); let num_docs: u64 = splits.iter().map(|split| split.num_docs).sum(); let num_splits = splits.len(); let current_span = tracing::Span::current(); @@ -1455,6 +1515,13 @@ pub async fn leaf_search( .leaf_search_targeted_splits .with_label_values(label_values) .observe(num_splits as f64); + print_jemalloc("leaf_search end"); + + tokio::spawn(async { + tokio::time::sleep(Duration::from_secs(1)).await; + print_jemalloc("1s after leaf_search"); + print_caches("1s after leaf_search"); + }); Ok(leaf_search_response_reresult??) } diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 717554b975d..22f5603c6d2 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -506,7 +506,7 @@ impl SearcherContext { let split_stream_semaphore = Semaphore::new(searcher_config.max_num_concurrent_split_streams); let fast_field_cache_capacity = searcher_config.fast_field_cache_capacity.as_u64() as usize; - let storage_long_term_cache = Arc::new(QuickwitCache::new(fast_field_cache_capacity)); + let fast_fields_cache = Arc::new(QuickwitCache::new_fast_fields(fast_field_cache_capacity)); let leaf_search_cache = LeafSearchCache::new(searcher_config.partial_request_cache_capacity.as_u64() as usize); let list_fields_cache = @@ -518,7 +518,7 @@ impl SearcherContext { Self { searcher_config, - fast_fields_cache: storage_long_term_cache, + fast_fields_cache, search_permit_provider: leaf_search_split_semaphore, split_footer_cache: global_split_footer_cache, split_stream_semaphore, diff --git a/quickwit/quickwit-serve/Cargo.toml b/quickwit/quickwit-serve/Cargo.toml index 716c5f05a35..0f681301d9b 100644 --- a/quickwit/quickwit-serve/Cargo.toml +++ b/quickwit/quickwit-serve/Cargo.toml @@ -103,6 +103,9 @@ quickwit-storage = { workspace = true, features = ["testsuite"] } pprof = [ "dep:pprof" ] +jemalloc-profiled = [ + "quickwit-common/jemalloc-profiled" +] testsuite = [] sqs-for-tests = [ "quickwit-indexing/sqs", diff --git a/quickwit/quickwit-serve/src/developer_api/heap_prof.rs b/quickwit/quickwit-serve/src/developer_api/heap_prof.rs new file mode 100644 index 00000000000..e5d4ff0be24 --- /dev/null +++ b/quickwit/quickwit-serve/src/developer_api/heap_prof.rs @@ -0,0 +1,82 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use quickwit_common::jemalloc_profiled::{ + start_profiling, stop_profiling, DEFAULT_MIN_ALLOC_SIZE_FOR_BACKTRACE, +}; +use serde::Deserialize; +use warp::reply::Reply; +use warp::Filter; + +pub fn heap_prof_handlers( +) -> impl Filter + Clone { + #[derive(Deserialize)] + struct ProfilerQueryParams { + duration: Option, + backtrace_alloc_size: Option, + } + + let start_profiler = { + warp::path!("heap-prof" / "start") + .and(warp::query::()) + .and_then(move |params: ProfilerQueryParams| start_profiler_handler(params)) + }; + + let stop_profiler = { warp::path!("heap-prof" / "stop").and_then(stop_profiler_handler) }; + + async fn start_profiler_handler( + params: ProfilerQueryParams, + ) -> Result, warp::Rejection> { + let min_alloc_size_for_backtrace = params + .backtrace_alloc_size + .unwrap_or(DEFAULT_MIN_ALLOC_SIZE_FOR_BACKTRACE); + if start_profiling(min_alloc_size_for_backtrace).is_err() { + return Ok(warp::reply::with_status( + "Heap profiling already running", + warp::http::StatusCode::SERVICE_UNAVAILABLE, + ) + .into_response()); + } + if let Some(duration_secs) = params.duration { + tokio::time::sleep(Duration::from_secs(duration_secs)).await; + stop_profiler_handler().await + } else { + Ok( + warp::reply::with_status("Heap profiling started", warp::http::StatusCode::OK) + .into_response(), + ) + } + } + + async fn stop_profiler_handler() -> Result, warp::Rejection> { + if let Ok(data) = stop_profiling() { + Ok(warp::reply::with_header( + serde_json::to_string_pretty(&data).unwrap(), + "Content-Type", + "application/json", + ) + .into_response()) + } else { + Ok(warp::reply::with_status( + "Heap profiling not running", + warp::http::StatusCode::SERVICE_UNAVAILABLE, + ) + .into_response()) + } + } + + start_profiler.or(stop_profiler) +} diff --git a/quickwit/quickwit-serve/src/developer_api/heap_prof_disabled.rs b/quickwit/quickwit-serve/src/developer_api/heap_prof_disabled.rs new file mode 100644 index 00000000000..81be36cd318 --- /dev/null +++ b/quickwit/quickwit-serve/src/developer_api/heap_prof_disabled.rs @@ -0,0 +1,29 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use warp::Filter; + +fn not_implemented_handler() -> impl warp::Reply { + warp::reply::with_status( + "Quickwit was compiled without the `jemalloc-profiled` feature", + warp::http::StatusCode::NOT_IMPLEMENTED, + ) +} + +pub fn heap_prof_handlers( +) -> impl Filter + Clone { + let start_profiler = { warp::path!("heap-prof" / "start").map(not_implemented_handler) }; + let stop_profiler = { warp::path!("heap-prof" / "stop").map(not_implemented_handler) }; + start_profiler.or(stop_profiler) +} diff --git a/quickwit/quickwit-serve/src/developer_api/mod.rs b/quickwit/quickwit-serve/src/developer_api/mod.rs index d58620a2670..0c0838ea8a4 100644 --- a/quickwit/quickwit-serve/src/developer_api/mod.rs +++ b/quickwit/quickwit-serve/src/developer_api/mod.rs @@ -13,14 +13,16 @@ // limitations under the License. mod debug; -mod log_level; +#[cfg_attr(not(feature = "jemalloc-profiled"), path = "heap_prof_disabled.rs")] +mod heap_prof; +mod log_level; #[cfg_attr(not(feature = "pprof"), path = "pprof_disabled.rs")] mod pprof; - mod server; use debug::debug_handler; +use heap_prof::heap_prof_handlers; use log_level::log_level_handler; use pprof::pprof_handlers; use quickwit_cluster::Cluster; @@ -42,7 +44,8 @@ pub(crate) fn developer_api_routes( .and( debug_handler(cluster.clone()) .or(log_level_handler(env_filter_reload_fn.clone()).boxed()) - .or(pprof_handlers()), + .or(pprof_handlers()) + .or(heap_prof_handlers()), ) .recover(recover_fn) } diff --git a/quickwit/quickwit-storage/src/cache/byte_range_cache.rs b/quickwit/quickwit-storage/src/cache/byte_range_cache.rs index 357a195c16b..89b8b458e53 100644 --- a/quickwit/quickwit-storage/src/cache/byte_range_cache.rs +++ b/quickwit/quickwit-storage/src/cache/byte_range_cache.rs @@ -14,6 +14,7 @@ use std::borrow::{Borrow, Cow}; use std::collections::BTreeMap; +use std::fmt::Debug; use std::ops::Range; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU64, Ordering}; @@ -23,6 +24,8 @@ use tantivy::directory::OwnedBytes; use crate::metrics::CacheMetrics; +// static BYTE_RANGE_CACHE_COUNT: AtomicU64 = AtomicU64::new(0); + #[derive(Clone, PartialOrd, Ord, PartialEq, Eq)] struct CacheKey<'a, T: ToOwned + ?Sized> { tag: Cow<'a, T>, @@ -61,8 +64,13 @@ struct NeedMutByteRangeCache { cache_counters: &'static CacheMetrics, } -impl NeedMutByteRangeCache { +impl + ?Sized + Ord + Debug> NeedMutByteRangeCache { fn with_infinite_capacity(cache_counters: &'static CacheMetrics) -> Self { + // let prev_cache_count = BYTE_RANGE_CACHE_COUNT.fetch_add(1, Ordering::SeqCst); + // println!( + // "(dbg) ByteRangeCache created, now have {}", + // prev_cache_count + 1 + // ); NeedMutByteRangeCache { cache: BTreeMap::new(), num_items: 0, @@ -225,6 +233,13 @@ impl NeedMutByteRangeCache { .filter(|(k, v)| k.tag == query.tag && range_end <= v.range_end) } + fn get_true_size(&self) -> u64 { + self.cache + .iter() + .map(|(_, v)| v.bytes.len() as u64) + .sum::() + } + /// Try to merge all blocks in the given range. Fails if some bytes were not already stored. fn merge_ranges<'a>( &mut self, @@ -301,6 +316,7 @@ impl NeedMutByteRangeCache { fn update_counter_record_item(&mut self, num_bytes: usize) { self.num_items += 1; self.num_bytes += num_bytes as u64; + assert_eq!(self.get_true_size(), self.num_bytes); self.cache_counters.in_cache_count.inc(); self.cache_counters.in_cache_num_bytes.add(num_bytes as i64); } @@ -317,6 +333,11 @@ impl NeedMutByteRangeCache { impl Drop for NeedMutByteRangeCache { fn drop(&mut self) { + // let prev_cache_count = BYTE_RANGE_CACHE_COUNT.fetch_sub(1, Ordering::SeqCst); + // println!( + // "(dbg) ByteRangeCache dropped, now have {}", + // prev_cache_count - 1 + // ); self.cache_counters .in_cache_count .sub(self.num_items as i64); @@ -558,4 +579,34 @@ mod tests { assert_eq!(mutable_cache.cache_counters.in_cache_num_bytes.get(), 20); } } + + #[test] + fn test_metrics_multiple_byte_range_caches() { + // we need to get a 'static ref to metrics, and want a dedicated metrics because we assert + // on it + static METRICS: Lazy = + Lazy::new(|| CacheMetrics::for_component("byterange_cache_test")); + + let cache_1 = ByteRangeCache::with_infinite_capacity(&METRICS); + + let cache_2 = ByteRangeCache::with_infinite_capacity(&METRICS); + + let key: std::path::PathBuf = "key".into(); + + assert_eq!(METRICS.in_cache_num_bytes.get(), 0); + cache_1.put_slice( + key.clone(), + 0..5, + OwnedBytes::new((0..5).collect::>()), + ); + assert_eq!(METRICS.in_cache_num_bytes.get(), 5); + cache_2.put_slice( + key.clone(), + 0..10, + OwnedBytes::new((0..10).collect::>()), + ); + assert_eq!(METRICS.in_cache_num_bytes.get(), 15); + drop(cache_1); + assert_eq!(METRICS.in_cache_num_bytes.get(), 10); + } } diff --git a/quickwit/quickwit-storage/src/cache/quickwit_cache.rs b/quickwit/quickwit-storage/src/cache/quickwit_cache.rs index aa34202773d..ad4273e9ecc 100644 --- a/quickwit/quickwit-storage/src/cache/quickwit_cache.rs +++ b/quickwit/quickwit-storage/src/cache/quickwit_cache.rs @@ -39,7 +39,7 @@ impl From)>> for QuickwitCache { impl QuickwitCache { /// Creates a [`QuickwitCache`] with a cache on fast fields /// with a capacity of `fast_field_cache_capacity`. - pub fn new(fast_field_cache_capacity: usize) -> Self { + pub fn new_fast_fields(fast_field_cache_capacity: usize) -> Self { let mut quickwit_cache = QuickwitCache::empty(); let fast_field_cache_counters: &'static CacheMetrics = &crate::STORAGE_METRICS.fast_field_cache;