diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index d7f4eee6c63..88e6f7ee978 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -6866,6 +6866,7 @@ dependencies = [ "anyhow", "async-speed-limit", "async-trait", + "backtrace", "bytesize", "coarsetime", "dyn-clone", @@ -6887,10 +6888,13 @@ dependencies = [ "regex", "serde", "serde_json", + "serial_test", "siphasher 0.3.11", "sysinfo", "tempfile", "thiserror 1.0.69", + "tikv-jemalloc-ctl", + "tikv-jemallocator", "tokio", "tokio-metrics", "tokio-stream", diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 1f687c96d72..460dfb06756 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -85,6 +85,7 @@ assert-json-diff = "2" async-compression = { version = "0.4", features = ["tokio", "gzip"] } async-speed-limit = "0.4" async-trait = "0.1" +backtrace = "0.3" base64 = "0.22" binggan = { version = "0.14" } bytes = { version = "1", features = ["serde"] } diff --git a/quickwit/quickwit-cli/Cargo.toml b/quickwit/quickwit-cli/Cargo.toml index 41a8fdce5e0..524e77fc8fd 100644 --- a/quickwit/quickwit-cli/Cargo.toml +++ b/quickwit/quickwit-cli/Cargo.toml @@ -81,6 +81,10 @@ quickwit-storage = { workspace = true, features = ["testsuite"] } [features] jemalloc = ["dep:tikv-jemalloc-ctl", "dep:tikv-jemallocator"] +jemalloc-profiled = [ + "quickwit-common/jemalloc-profiled", + "quickwit-serve/jemalloc-profiled" +] ci-test = [] pprof = ["quickwit-serve/pprof"] openssl-support = ["openssl-probe"] @@ -127,6 +131,10 @@ release-macos-feature-vendored-set = [ "quickwit-metastore/postgres", "quickwit-doc-mapper/multilang", ] +release-heap-profiled = [ + "release-feature-set", + "jemalloc-profiled" +] [package.metadata.cargo-machete] # used to enable the `multilang` feature diff --git a/quickwit/quickwit-cli/src/jemalloc.rs b/quickwit/quickwit-cli/src/jemalloc.rs index 71969f79909..e5223e5ee31 100644 --- a/quickwit/quickwit-cli/src/jemalloc.rs +++ b/quickwit/quickwit-cli/src/jemalloc.rs @@ -19,6 +19,10 @@ use tikv_jemallocator::Jemalloc; use tracing::error; #[global_allocator] +#[cfg(feature = "jemalloc-profiled")] +pub static GLOBAL: quickwit_common::jemalloc_profiled::JemallocProfiled = + quickwit_common::jemalloc_profiled::JemallocProfiled(Jemalloc); +#[cfg(not(feature = "jemalloc-profiled"))] pub static GLOBAL: Jemalloc = Jemalloc; const JEMALLOC_METRICS_POLLING_INTERVAL: Duration = Duration::from_secs(1); diff --git a/quickwit/quickwit-common/Cargo.toml b/quickwit/quickwit-common/Cargo.toml index 3abcd35f3d5..5f0e7f1375b 100644 --- a/quickwit/quickwit-common/Cargo.toml +++ b/quickwit/quickwit-common/Cargo.toml @@ -14,6 +14,7 @@ license.workspace = true anyhow = { workspace = true } async-speed-limit = { workspace = true } async-trait = { workspace = true } +backtrace = { workspace = true, optional = true } bytesize = { workspace = true } coarsetime = { workspace = true } dyn-clone = { workspace = true } @@ -37,6 +38,8 @@ siphasher = { workspace = true } sysinfo = { workspace = true } tempfile = { workspace = true } thiserror = { workspace = true } +tikv-jemallocator = { workspace = true, optional = true } +tikv-jemalloc-ctl = { workspace = true, optional = true } tokio = { workspace = true } tokio-metrics = { workspace = true } tokio-stream = { workspace = true } @@ -47,9 +50,15 @@ tracing = { workspace = true } [features] testsuite = [] named_tasks = ["tokio/tracing"] +jemalloc-profiled = [ + "dep:backtrace", + "dep:tikv-jemallocator", + "dep:tikv-jemalloc-ctl" +] [dev-dependencies] serde_json = { workspace = true } tempfile = { workspace = true } proptest = { workspace = true } +serial_test = { workspace = true } tokio = { workspace = true, features = ["test-util"] } diff --git a/quickwit/quickwit-common/src/alloc_tracker.rs b/quickwit/quickwit-common/src/alloc_tracker.rs new file mode 100644 index 00000000000..655e5962e7d --- /dev/null +++ b/quickwit/quickwit-common/src/alloc_tracker.rs @@ -0,0 +1,236 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::collections::hash_map::Entry; +use std::sync::Mutex; + +use bytesize::ByteSize; +use once_cell::sync::Lazy; + +static ALLOCATION_TRACKER: Lazy> = + Lazy::new(|| Mutex::new(Allocations::default())); + +#[derive(Debug)] +struct Allocation { + pub callsite_hash: u64, + pub size: ByteSize, +} + +#[derive(Debug, Copy, Clone)] +pub struct Statistic { + pub count: u64, + pub size: ByteSize, + pub last_report: ByteSize, +} + +#[derive(Debug)] +enum Status { + Started { reporting_interval: ByteSize }, + Stopped, +} + +/// WARN: +/// - keys and values in these maps should not allocate! +/// - we assume HashMaps don't allocate if their capacity is not exceeded +#[derive(Debug)] +struct Allocations { + memory_locations: HashMap, + callsite_statistics: HashMap, + status: Status, +} + +impl Default for Allocations { + fn default() -> Self { + Self { + memory_locations: HashMap::with_capacity(128 * 1024), + callsite_statistics: HashMap::with_capacity(32 * 1024), + status: Status::Stopped, + } + } +} + +pub fn init(reporting_interval_bytes: u64) { + let mut guard = ALLOCATION_TRACKER.lock().unwrap(); + guard.memory_locations.clear(); + guard.callsite_statistics.clear(); + guard.status = Status::Started { + reporting_interval: ByteSize(reporting_interval_bytes), + } +} + +pub enum AllocRecordingResponse { + ThresholdExceeded(Statistic), + ThresholdNotExceeded, + TrackerFull(&'static str), + NotStarted, +} + +/// Records an allocation and occasionally reports the cumulated allocation size +/// for the provided callsite_hash. +/// +/// Every time a the total allocated size with the same callsite_hash +/// exceeds the previous reported value by at least reporting_interval, that +/// allocated size is reported. +/// +/// WARN: this function should not allocate! +pub fn record_allocation( + callsite_hash: u64, + size_bytes: u64, + ptr: *mut u8, +) -> AllocRecordingResponse { + let mut guard = ALLOCATION_TRACKER.lock().unwrap(); + let Status::Started { reporting_interval } = guard.status else { + return AllocRecordingResponse::NotStarted; + }; + if guard.memory_locations.capacity() == guard.memory_locations.len() { + return AllocRecordingResponse::TrackerFull("memory_locations"); + } + if guard.callsite_statistics.capacity() == guard.callsite_statistics.len() { + return AllocRecordingResponse::TrackerFull("memory_locations"); + } + guard.memory_locations.insert( + ptr as usize, + Allocation { + callsite_hash, + size: ByteSize(size_bytes), + }, + ); + let entry = guard + .callsite_statistics + .entry(callsite_hash) + .and_modify(|stat| { + stat.count += 1; + stat.size += size_bytes; + }) + .or_insert(Statistic { + count: 1, + size: ByteSize(size_bytes), + last_report: ByteSize(0), + }); + let new_threshold_exceeded = entry.size > (entry.last_report + reporting_interval); + if new_threshold_exceeded { + let reported_statistic = *entry; + entry.last_report = entry.size; + AllocRecordingResponse::ThresholdExceeded(reported_statistic) + } else { + AllocRecordingResponse::ThresholdNotExceeded + } +} + +/// WARN: this function should not allocate! +pub fn record_deallocation(ptr: *mut u8) { + let mut guard = ALLOCATION_TRACKER.lock().unwrap(); + if let Status::Stopped = guard.status { + return; + } + let Some(Allocation { + size, + callsite_hash, + .. + }) = guard.memory_locations.remove(&(ptr as usize)) + else { + // this was allocated before the tracking started + return; + }; + if let Entry::Occupied(mut content) = guard.callsite_statistics.entry(callsite_hash) { + let new_size_bytes = content.get().size.0.saturating_sub(size.0); + let new_count = content.get().count.saturating_sub(1); + content.get_mut().count = new_count; + content.get_mut().size = ByteSize(new_size_bytes); + if content.get().count == 0 { + content.remove(); + } + } +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + #[serial_test::file_serial] + fn test_record_allocation_and_deallocation() { + init(2000); + let callsite_hash_1 = 777; + + let ptr_1 = 0x1 as *mut u8; + let response = record_allocation(callsite_hash_1, 1500, ptr_1); + assert!(matches!( + response, + AllocRecordingResponse::ThresholdNotExceeded + )); + + let ptr_2 = 0x2 as *mut u8; + let response = record_allocation(callsite_hash_1, 1500, ptr_2); + let AllocRecordingResponse::ThresholdExceeded(statistic) = response else { + panic!("Expected ThresholdExceeded response"); + }; + assert_eq!(statistic.count, 2); + assert_eq!(statistic.size, ByteSize(3000)); + assert_eq!(statistic.last_report, ByteSize(0)); + + record_deallocation(ptr_2); + + // the threshold was already crossed + let ptr_3 = 0x3 as *mut u8; + let response = record_allocation(callsite_hash_1, 1500, ptr_3); + assert!(matches!( + response, + AllocRecordingResponse::ThresholdNotExceeded + )); + + // this is a brand new call site with different statistics + let callsite_hash_2 = 42; + let ptr_3 = 0x3 as *mut u8; + let response = record_allocation(callsite_hash_2, 1500, ptr_3); + assert!(matches!( + response, + AllocRecordingResponse::ThresholdNotExceeded + )); + } + + #[test] + #[serial_test::file_serial] + fn test_tracker_full() { + init(1024 * 1024 * 1024); + let memory_locations_capacity = ALLOCATION_TRACKER + .lock() + .unwrap() + .memory_locations + .capacity(); + + for i in 0..memory_locations_capacity { + let ptr = (i + 1) as *mut u8; + let response = record_allocation(777, 10, ptr); + assert!(matches!( + response, + AllocRecordingResponse::ThresholdNotExceeded + )); + } + let response = record_allocation(777, 10, (memory_locations_capacity + 1) as *mut u8); + assert!(matches!( + response, + AllocRecordingResponse::TrackerFull("memory_locations") + )); + // make sure that the map didn't grow + let current_memory_locations_capacity = ALLOCATION_TRACKER + .lock() + .unwrap() + .memory_locations + .capacity(); + assert_eq!(current_memory_locations_capacity, memory_locations_capacity); + } +} diff --git a/quickwit/quickwit-common/src/jemalloc_profiled.rs b/quickwit/quickwit-common/src/jemalloc_profiled.rs new file mode 100644 index 00000000000..495cdcf31cc --- /dev/null +++ b/quickwit/quickwit-common/src/jemalloc_profiled.rs @@ -0,0 +1,250 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::alloc::{GlobalAlloc, Layout}; +use std::hash::Hasher; +use std::io::Write; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; + +use bytesize::ByteSize; +use tikv_jemallocator::Jemalloc; +use tracing::{error, info}; + +use crate::alloc_tracker::{self, AllocRecordingResponse}; + +const DEFAULT_MIN_ALLOC_BYTES_FOR_PROFILING: u64 = 64 * 1024; +const DEFAULT_REPORTING_INTERVAL_BYTES: u64 = 1024 * 1024 * 1024; + +/// Atomics are used to communicate configurations between the start/stop +/// endpoints and the JemallocProfiled allocator wrapper. +/// +/// The flags are padded to avoid false sharing of the CPU cache line between +/// threads. 128 bytes is the cache line size on x86_64 and arm64. +#[repr(align(128))] +struct Flags { + /// The minimum allocation size that is recorded by the tracker. + min_alloc_bytes_for_profiling: AtomicU64, + /// Whether the profiling is started or not. + enabled: AtomicBool, + /// Padding to make sure we fill the cache line. + _padding: [u8; 119], // 128 (align) - 8 (u64) - 1 (bool) +} + +static FLAGS: Flags = Flags { + min_alloc_bytes_for_profiling: AtomicU64::new(DEFAULT_MIN_ALLOC_BYTES_FOR_PROFILING), + enabled: AtomicBool::new(false), + _padding: [0; 119], +}; + +/// Starts measuring heap allocations and logs important leaks. +/// +/// This function uses a wrapper around the global Jemalloc allocator to +/// instrument it. +/// +/// Each time an allocation bigger than min_alloc_bytes_for_profiling is +/// performed, it is recorded in a map and the statistics for its call site are +/// updated. Tracking allocations is costly because it requires acquiring a +/// global mutex. Setting a reasonable value for min_alloc_bytes_for_profiling +/// is crucial. For instance for a search aggregation request, tracking every +/// allocations (min_alloc_bytes_for_profiling=1) is typically 100x slower than +/// using a minimum of 64kB. +/// +/// During profiling, the statistics per call site are used to log when specific +/// thresholds are exceeded. For each call site, the allocated memory is logged +/// (with a backtrace) every time it exceeds the last logged allocated memory by +/// at least alloc_bytes_triggering_backtrace. This logging interval should +/// usually be set to a value of at least 500MB to limit the logging verbosity. +pub fn start_profiling( + min_alloc_bytes_for_profiling: Option, + alloc_bytes_triggering_backtrace: Option, +) { + #[cfg(miri)] + warn!( + "heap profiling is not supported with Miri because in that case the `backtrace` crate \ + allocates" + ); + + // Call backtrace once to warmup symbolization allocations (~30MB) + backtrace::trace(|frame| { + backtrace::resolve_frame(frame, |_| {}); + true + }); + + let alloc_bytes_triggering_backtrace = + alloc_bytes_triggering_backtrace.unwrap_or(DEFAULT_REPORTING_INTERVAL_BYTES); + alloc_tracker::init(alloc_bytes_triggering_backtrace); + + let min_alloc_bytes_for_profiling = + min_alloc_bytes_for_profiling.unwrap_or(DEFAULT_MIN_ALLOC_BYTES_FOR_PROFILING); + + // stdout() might allocate a buffer on first use. If the first allocation + // tracked comes from stdout, it will trigger a deadlock. Logging here + // guarantees that it doesn't happen. + info!( + min_alloc_for_profiling = %ByteSize(min_alloc_bytes_for_profiling), + alloc_triggering_backtrace = %ByteSize(alloc_bytes_triggering_backtrace), + "heap profiling running" + ); + + // Use strong ordering to make sure all threads see these changes in this order + FLAGS + .min_alloc_bytes_for_profiling + .store(min_alloc_bytes_for_profiling, Ordering::SeqCst); + FLAGS.enabled.store(true, Ordering::SeqCst); +} + +/// Stops measuring heap allocations. +/// +/// The allocation tracking tables and the symbol cache are not cleared. +pub fn stop_profiling() { + // Use strong ordering to make sure all threads see these changes in this order + let previously_enabled = FLAGS.enabled.swap(false, Ordering::SeqCst); + FLAGS + .min_alloc_bytes_for_profiling + .store(DEFAULT_MIN_ALLOC_BYTES_FOR_PROFILING, Ordering::SeqCst); + + info!(previously_enabled, "heap profiling stopped"); +} + +/// Wraps the Jemalloc global allocator calls with tracking routines. +/// +/// The tracking routines are called only when [ENABLED] is set to true (calling +/// [start_profiling()]), but we don't enforce any synchronization (we load it with +/// Ordering::Relaxed) because it's fine to miss or record extra allocation events. +/// +/// It's important to ensure that no allocations are performed inside the allocator! +pub struct JemallocProfiled(pub Jemalloc); + +unsafe impl GlobalAlloc for JemallocProfiled { + #[inline] + unsafe fn alloc(&self, layout: Layout) -> *mut u8 { + let ptr = unsafe { self.0.alloc(layout) }; + if FLAGS.enabled.load(Ordering::Relaxed) { + track_alloc_call(ptr, layout); + } + ptr + } + + #[inline] + unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 { + let ptr = unsafe { self.0.alloc_zeroed(layout) }; + if FLAGS.enabled.load(Ordering::Relaxed) { + track_alloc_call(ptr, layout); + } + ptr + } + + #[inline] + unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { + if FLAGS.enabled.load(Ordering::Relaxed) { + track_dealloc_call(ptr, layout); + } + unsafe { self.0.dealloc(ptr, layout) } + } + + #[inline] + unsafe fn realloc(&self, old_ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 { + let new_ptr = unsafe { self.0.realloc(old_ptr, layout, new_size) }; + if FLAGS.enabled.load(Ordering::Relaxed) { + track_realloc_call(old_ptr, new_ptr, layout, new_size); + } + new_ptr + } +} + +/// Warning: stdout allocates a buffer on first use. +#[inline] +fn print_backtrace(callsite_hash: u64, stat: alloc_tracker::Statistic) { + { + let mut lock = std::io::stdout().lock(); + let _ = writeln!( + &mut lock, + "htrk callsite={} allocs={} size={}", + callsite_hash, stat.count, stat.size + ); + backtrace::trace(|frame| { + backtrace::resolve_frame(frame, |symbol| { + if let Some(symbole_name) = symbol.name() { + let _ = writeln!(&mut lock, "{}", symbole_name); + } else { + let _ = writeln!(&mut lock, "symb failed"); + } + }); + true + }); + } +} + +#[inline] +fn backtrace_hash() -> u64 { + let mut hasher = fnv::FnvHasher::default(); + backtrace::trace(|frame| { + hasher.write_usize(frame.ip() as usize); + true + }); + hasher.finish() +} + +/// Warning: allocating inside this function can cause an error (abort, panic or even deadlock). +#[cold] +fn track_alloc_call(ptr: *mut u8, layout: Layout) { + if layout.size() >= FLAGS.min_alloc_bytes_for_profiling.load(Ordering::Relaxed) as usize { + let callsite_hash = backtrace_hash(); + let recording_response = + alloc_tracker::record_allocation(callsite_hash, layout.size() as u64, ptr); + + match recording_response { + AllocRecordingResponse::ThresholdExceeded(stat_for_trace) => { + // warning: stdout might allocate a buffer on first use + print_backtrace(callsite_hash, stat_for_trace); + } + AllocRecordingResponse::TrackerFull(table_name) => { + // this message might be displayed multiple times but that's fine + // warning: stdout might allocate a buffer on first use + error!("heap profiling stopped, {table_name} full"); + FLAGS.enabled.store(false, Ordering::Relaxed); + } + AllocRecordingResponse::ThresholdNotExceeded => {} + AllocRecordingResponse::NotStarted => {} + } + } +} + +/// Warning: allocating inside this function can cause an error (abort, panic or even deadlock). +#[cold] +fn track_dealloc_call(ptr: *mut u8, layout: Layout) { + if layout.size() >= FLAGS.min_alloc_bytes_for_profiling.load(Ordering::Relaxed) as usize { + alloc_tracker::record_deallocation(ptr); + } +} + +#[cold] +fn track_realloc_call( + _old_ptr: *mut u8, + _new_pointer: *mut u8, + _current_layout: Layout, + _new_size: usize, +) { + // TODO handle realloc +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_size_of_flags() { + assert_eq!(std::mem::size_of::(), 128); + } +} diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index 12987898b0f..2b9fa51474d 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -16,9 +16,13 @@ mod coolid; +#[cfg(feature = "jemalloc-profiled")] +pub(crate) mod alloc_tracker; pub mod binary_heap; pub mod fs; pub mod io; +#[cfg(feature = "jemalloc-profiled")] +pub mod jemalloc_profiled; mod kill_switch; pub mod metrics; pub mod net; diff --git a/quickwit/quickwit-serve/Cargo.toml b/quickwit/quickwit-serve/Cargo.toml index 88c3e4278b4..1c66ccc0e31 100644 --- a/quickwit/quickwit-serve/Cargo.toml +++ b/quickwit/quickwit-serve/Cargo.toml @@ -106,6 +106,9 @@ quickwit-storage = { workspace = true, features = ["testsuite"] } pprof = [ "dep:pprof" ] +jemalloc-profiled = [ + "quickwit-common/jemalloc-profiled" +] testsuite = [] sqs-for-tests = [ "quickwit-indexing/sqs", diff --git a/quickwit/quickwit-serve/src/developer_api/heap_prof.rs b/quickwit/quickwit-serve/src/developer_api/heap_prof.rs new file mode 100644 index 00000000000..0e777bae3c8 --- /dev/null +++ b/quickwit/quickwit-serve/src/developer_api/heap_prof.rs @@ -0,0 +1,53 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use quickwit_common::jemalloc_profiled::{start_profiling, stop_profiling}; +use serde::Deserialize; +use warp::Filter; +use warp::reply::Reply; + +pub fn heap_prof_handlers() +-> impl Filter + Clone { + #[derive(Deserialize)] + struct ProfilerQueryParams { + min_alloc_size: Option, + backtrace_every: Option, + } + + let start_profiler = { + warp::path!("heap-prof" / "start") + .and(warp::query::()) + .and_then(move |params: ProfilerQueryParams| start_profiler_handler(params)) + }; + + let stop_profiler = { warp::path!("heap-prof" / "stop").and_then(stop_profiler_handler) }; + + async fn start_profiler_handler( + params: ProfilerQueryParams, + ) -> Result, warp::Rejection> { + start_profiling(params.min_alloc_size, params.backtrace_every); + let resp = warp::reply::with_status("Heap profiling started", warp::http::StatusCode::OK) + .into_response(); + Ok(resp) + } + + async fn stop_profiler_handler() -> Result, warp::Rejection> { + stop_profiling(); + let resp = warp::reply::with_status("Heap profiling stopped", warp::http::StatusCode::OK) + .into_response(); + Ok(resp) + } + + start_profiler.or(stop_profiler) +} diff --git a/quickwit/quickwit-serve/src/developer_api/heap_prof_disabled.rs b/quickwit/quickwit-serve/src/developer_api/heap_prof_disabled.rs new file mode 100644 index 00000000000..a71f724ae0d --- /dev/null +++ b/quickwit/quickwit-serve/src/developer_api/heap_prof_disabled.rs @@ -0,0 +1,29 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use warp::Filter; + +fn not_implemented_handler() -> impl warp::Reply { + warp::reply::with_status( + "Quickwit was compiled without the `jemalloc-profiled` feature", + warp::http::StatusCode::NOT_IMPLEMENTED, + ) +} + +pub fn heap_prof_handlers() +-> impl Filter + Clone { + let start_profiler = { warp::path!("heap-prof" / "start").map(not_implemented_handler) }; + let stop_profiler = { warp::path!("heap-prof" / "stop").map(not_implemented_handler) }; + start_profiler.or(stop_profiler) +} diff --git a/quickwit/quickwit-serve/src/developer_api/mod.rs b/quickwit/quickwit-serve/src/developer_api/mod.rs index 4163db9c933..c7722d3a581 100644 --- a/quickwit/quickwit-serve/src/developer_api/mod.rs +++ b/quickwit/quickwit-serve/src/developer_api/mod.rs @@ -13,14 +13,16 @@ // limitations under the License. mod debug; -mod log_level; +#[cfg_attr(not(feature = "jemalloc-profiled"), path = "heap_prof_disabled.rs")] +mod heap_prof; +mod log_level; #[cfg_attr(not(feature = "pprof"), path = "pprof_disabled.rs")] mod pprof; - mod server; use debug::debug_handler; +use heap_prof::heap_prof_handlers; use log_level::log_level_handler; use pprof::pprof_handlers; use quickwit_cluster::Cluster; @@ -42,7 +44,8 @@ pub(crate) fn developer_api_routes( .and( debug_handler(cluster.clone()) .or(log_level_handler(env_filter_reload_fn.clone()).boxed()) - .or(pprof_handlers()), + .or(pprof_handlers()) + .or(heap_prof_handlers()), ) .recover(recover_fn) }