Skip to content

Commit fbb8580

Browse files
authored
Add Tokio runtimes metrics (#4984)
1 parent c5c2b70 commit fbb8580

File tree

22 files changed

+169
-125
lines changed

22 files changed

+169
-125
lines changed

.cargo/config.toml

+6-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1+
[build]
2+
rustflags = ["--cfg", "tokio_unstable"]
3+
rustdocflags = ["--cfg", "tokio_unstable"]
4+
15
[target.x86_64-unknown-linux-gnu]
2-
# Targetting x86-64-v2 gives a ~2% performance boost while only
6+
# Targeting x86-64-v2 gives a ~2% performance boost while only
37
# disallowing Intel CPUs older than 2008 and AMD CPUs older than 2011.
48
# None of those very old CPUs are used in GCP
59
# (https://cloud.google.com/compute/docs/cpu-platforms). Unfortunately,
610
# AWS does not seem to disclose the exact CPUs they use.
7-
rustflags = ["-C", "target-cpu=x86-64-v2"]
11+
rustflags = ["-C", "target-cpu=x86-64-v2", "--cfg", "tokio_unstable"]
812

.github/workflows/cbench.yml

+3-1
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@ on:
1313
# pull request.
1414
pull_request_target:
1515

16+
env:
17+
RUSTFLAGS: --cfg tokio_unstable
18+
1619
concurrency:
1720
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
1821
cancel-in-progress: true
1922

20-
2123
jobs:
2224
tests:
2325
name: Benchmark

.github/workflows/ci.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ env:
1616
QW_TEST_DATABASE_URL: postgres://quickwit-dev:quickwit-dev@localhost:5432/quickwit-metastore-dev
1717
RUST_BACKTRACE: 1
1818
RUSTDOCFLAGS: -Dwarnings -Arustdoc::private_intra_doc_links
19-
RUSTFLAGS: -Dwarnings
19+
RUSTFLAGS: -Dwarnings --cfg tokio_unstable
2020

2121
# Ensures that we cancel running jobs for the same PR / same workflow.
2222
concurrency:

.github/workflows/coverage.yml

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ env:
2121
QW_S3_ENDPOINT: "http://localhost:4566" # Services are exposed as localhost because we are not running coverage in a container.
2222
QW_S3_FORCE_PATH_STYLE_ACCESS: 1
2323
QW_TEST_DATABASE_URL: postgres://quickwit-dev:quickwit-dev@localhost:5432/quickwit-metastore-dev
24+
RUSTFLAGS: -Dwarnings --cfg tokio_unstable
2425

2526
jobs:
2627
test:

Dockerfile

+2-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ COPY --from=ui-builder /quickwit/quickwit-ui/build /quickwit/quickwit-ui/build
3939
WORKDIR /quickwit
4040

4141
RUN echo "Building workspace with feature(s) '$CARGO_FEATURES' and profile '$CARGO_PROFILE'" \
42-
&& cargo build \
42+
&& ENV RUSTFLAGS="--cfg tokio_unstable" \
43+
cargo build \
4344
-p quickwit-cli \
4445
--features $CARGO_FEATURES \
4546
--bin quickwit \

quickwit/Cargo.lock

+13
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ tikv-jemalloc-ctl = "0.5"
231231
tikv-jemallocator = "0.5"
232232
time = { version = "0.3", features = ["std", "formatting", "macros"] }
233233
tokio = { version = "1.37", features = ["full"] }
234+
tokio-metrics = { version = "0.3.1", features = ["rt"] }
234235
tokio-stream = { version = "0.1", features = ["sync"] }
235236
tokio-util = { version = "0.7", features = ["full"] }
236237
toml = "0.7.6"

quickwit/quickwit-cli/src/main.rs

+8-3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
use std::collections::BTreeMap;
2323

24+
use anyhow::Context;
2425
use colored::Colorize;
2526
use opentelemetry::global;
2627
use quickwit_cli::busy_detector;
@@ -29,17 +30,21 @@ use quickwit_cli::cli::{build_cli, CliCommand};
2930
#[cfg(feature = "jemalloc")]
3031
use quickwit_cli::jemalloc::start_jemalloc_metrics_loop;
3132
use quickwit_cli::logger::setup_logging_and_tracing;
33+
use quickwit_common::runtimes::scrape_tokio_runtime_metrics;
3234
use quickwit_serve::BuildInfo;
3335
use tracing::error;
3436

3537
fn main() -> anyhow::Result<()> {
36-
tokio::runtime::Builder::new_multi_thread()
38+
let rt = tokio::runtime::Builder::new_multi_thread()
3739
.enable_all()
3840
.on_thread_unpark(busy_detector::thread_unpark)
3941
.on_thread_park(busy_detector::thread_park)
4042
.build()
41-
.unwrap()
42-
.block_on(main_impl())
43+
.context("failed to start main Tokio runtime")?;
44+
45+
scrape_tokio_runtime_metrics(rt.handle(), "main");
46+
47+
rt.block_on(main_impl())
4348
}
4449

4550
fn register_build_info_metric() {

quickwit/quickwit-cli/src/split.rs

-8
Original file line numberDiff line numberDiff line change
@@ -340,14 +340,6 @@ async fn mark_splits_for_deletion_cli(args: MarkForDeletionArgs) -> anyhow::Resu
340340
Ok(())
341341
}
342342

343-
#[derive(Tabled)]
344-
struct FileRow {
345-
#[tabled(rename = "File Name")]
346-
file_name: String,
347-
#[tabled(rename = "Size")]
348-
size: String,
349-
}
350-
351343
async fn describe_split_cli(args: DescribeSplitArgs) -> anyhow::Result<()> {
352344
debug!(args=?args, "describe-split");
353345
let qw_client = args.client_args.client();

quickwit/quickwit-cluster/src/metrics.rs

+5
Original file line numberDiff line numberDiff line change
@@ -93,26 +93,31 @@ impl Default for ClusterMetrics {
9393
"gossip_recv_messages_total",
9494
"Total number of gossip messages received.",
9595
"cluster",
96+
&[],
9697
),
9798
gossip_recv_bytes_total: new_counter(
9899
"gossip_recv_bytes_total",
99100
"Total amount of gossip data received in bytes.",
100101
"cluster",
102+
&[],
101103
),
102104
gossip_sent_messages_total: new_counter(
103105
"gossip_sent_messages_total",
104106
"Total number of gossip messages sent.",
105107
"cluster",
108+
&[],
106109
),
107110
gossip_sent_bytes_total: new_counter(
108111
"gossip_sent_bytes_total",
109112
"Total amount of gossip data sent in bytes.",
110113
"cluster",
114+
&[],
111115
),
112116
grpc_gossip_rounds_total: new_counter(
113117
"grpc_gossip_rounds_total",
114118
"Total number of gRPC gossip rounds performed with peer nodes.",
115119
"cluster",
120+
&[],
116121
),
117122
}
118123
}

quickwit/quickwit-common/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ siphasher = { workspace = true }
3737
tempfile = { workspace = true }
3838
thiserror = { workspace = true }
3939
tokio = { workspace = true }
40+
tokio-metrics ={ workspace = true }
4041
tokio-stream = { workspace = true }
4142
tonic = { workspace = true }
4243
tower = { workspace = true }

quickwit/quickwit-common/src/metrics.rs

+32-3
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ pub use prometheus::{
2626
IntCounter, IntCounterVec as PrometheusIntCounterVec, IntGauge,
2727
IntGaugeVec as PrometheusIntGaugeVec,
2828
};
29-
use prometheus::{Encoder, HistogramOpts, Opts, TextEncoder};
29+
use prometheus::{Encoder, Gauge, HistogramOpts, Opts, TextEncoder};
3030

3131
#[derive(Clone)]
3232
pub struct HistogramVec<const N: usize> {
@@ -71,10 +71,20 @@ pub fn register_info(name: &'static str, help: &'static str, kvs: BTreeMap<&'sta
7171
prometheus::register(Box::new(counter)).expect("failed to register counter");
7272
}
7373

74-
pub fn new_counter(name: &str, help: &str, subsystem: &str) -> IntCounter {
74+
pub fn new_counter(
75+
name: &str,
76+
help: &str,
77+
subsystem: &str,
78+
const_labels: &[(&str, &str)],
79+
) -> IntCounter {
80+
let owned_const_labels: HashMap<String, String> = const_labels
81+
.iter()
82+
.map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string()))
83+
.collect();
7584
let counter_opts = Opts::new(name, help)
7685
.namespace("quickwit")
77-
.subsystem(subsystem);
86+
.subsystem(subsystem)
87+
.const_labels(owned_const_labels);
7888
let counter = IntCounter::with_opts(counter_opts).expect("failed to create counter");
7989
prometheus::register(Box::new(counter.clone())).expect("failed to register counter");
8090
counter
@@ -104,6 +114,25 @@ pub fn new_counter_vec<const N: usize>(
104114
IntCounterVec { underlying }
105115
}
106116

117+
pub fn new_float_gauge(
118+
name: &str,
119+
help: &str,
120+
subsystem: &str,
121+
const_labels: &[(&str, &str)],
122+
) -> Gauge {
123+
let owned_const_labels: HashMap<String, String> = const_labels
124+
.iter()
125+
.map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string()))
126+
.collect();
127+
let gauge_opts = Opts::new(name, help)
128+
.namespace("quickwit")
129+
.subsystem(subsystem)
130+
.const_labels(owned_const_labels);
131+
let gauge = Gauge::with_opts(gauge_opts).expect("failed to create float gauge");
132+
prometheus::register(Box::new(gauge.clone())).expect("failed to register float gauge");
133+
gauge
134+
}
135+
107136
pub fn new_gauge(
108137
name: &str,
109138
help: &str,

quickwit/quickwit-common/src/runtimes.rs

+77-2
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,14 @@
1919

2020
use std::collections::HashMap;
2121
use std::sync::atomic::{AtomicUsize, Ordering};
22+
use std::time::Duration;
2223

2324
use once_cell::sync::OnceCell;
25+
use prometheus::{Gauge, IntCounter, IntGauge};
2426
use tokio::runtime::Runtime;
27+
use tokio_metrics::{RuntimeMetrics, RuntimeMonitor};
28+
29+
use crate::metrics::{new_counter, new_float_gauge, new_gauge};
2530

2631
static RUNTIMES: OnceCell<HashMap<RuntimeType, tokio::runtime::Runtime>> = OnceCell::new();
2732

@@ -63,7 +68,7 @@ impl RuntimesConfig {
6368
}
6469

6570
pub fn with_num_cpus(num_cpus: usize) -> Self {
66-
// Non blocking task are supposed to be io intensive, and not require many threads...
71+
// Non blocking task are supposed to be io intensive, and not require many threads...
6772
let num_threads_non_blocking = if num_cpus > 6 { 2 } else { 1 };
6873
// On the other hand the blocking actors are cpu intensive. We allocate
6974
// almost all of the threads to them.
@@ -83,7 +88,8 @@ impl Default for RuntimesConfig {
8388
}
8489

8590
fn start_runtimes(config: RuntimesConfig) -> HashMap<RuntimeType, Runtime> {
86-
let mut runtimes = HashMap::default();
91+
let mut runtimes = HashMap::with_capacity(2);
92+
8793
let blocking_runtime = tokio::runtime::Builder::new_multi_thread()
8894
.worker_threads(config.num_threads_blocking)
8995
.thread_name_fn(|| {
@@ -94,7 +100,10 @@ fn start_runtimes(config: RuntimesConfig) -> HashMap<RuntimeType, Runtime> {
94100
.enable_all()
95101
.build()
96102
.unwrap();
103+
104+
scrape_tokio_runtime_metrics(blocking_runtime.handle(), "blocking");
97105
runtimes.insert(RuntimeType::Blocking, blocking_runtime);
106+
98107
let non_blocking_runtime = tokio::runtime::Builder::new_multi_thread()
99108
.worker_threads(config.num_threads_non_blocking)
100109
.thread_name_fn(|| {
@@ -105,7 +114,10 @@ fn start_runtimes(config: RuntimesConfig) -> HashMap<RuntimeType, Runtime> {
105114
.enable_all()
106115
.build()
107116
.unwrap();
117+
118+
scrape_tokio_runtime_metrics(non_blocking_runtime.handle(), "non_blocking");
108119
runtimes.insert(RuntimeType::NonBlocking, non_blocking_runtime);
120+
109121
runtimes
110122
}
111123

@@ -135,6 +147,69 @@ impl RuntimeType {
135147
}
136148
}
137149

150+
/// Spawns a background task
151+
pub fn scrape_tokio_runtime_metrics(handle: &tokio::runtime::Handle, label: &'static str) {
152+
let runtime_monitor = RuntimeMonitor::new(handle);
153+
handle.spawn(async move {
154+
let mut interval = tokio::time::interval(Duration::from_secs(1));
155+
let mut prometheus_runtime_metrics = PrometheusRuntimeMetrics::new(label);
156+
157+
for tokio_runtime_metrics in runtime_monitor.intervals() {
158+
interval.tick().await;
159+
prometheus_runtime_metrics.update(&tokio_runtime_metrics);
160+
}
161+
});
162+
}
163+
164+
struct PrometheusRuntimeMetrics {
165+
scheduled_tasks: IntGauge,
166+
worker_busy_duration_milliseconds_total: IntCounter,
167+
worker_busy_ratio: Gauge,
168+
worker_threads: IntGauge,
169+
}
170+
171+
impl PrometheusRuntimeMetrics {
172+
pub fn new(label: &'static str) -> Self {
173+
Self {
174+
scheduled_tasks: new_gauge(
175+
"tokio_scheduled_tasks",
176+
"The total number of tasks currently scheduled in workers' local queues.",
177+
"runtime",
178+
&[("runtime_type", label)],
179+
),
180+
worker_busy_duration_milliseconds_total: new_counter(
181+
"tokio_worker_busy_duration_milliseconds_total",
182+
" The total amount of time worker threads were busy.",
183+
"runtime",
184+
&[("runtime_type", label)],
185+
),
186+
worker_busy_ratio: new_float_gauge(
187+
"tokio_worker_busy_ratio",
188+
"The ratio of time worker threads were busy since the last time runtime metrics \
189+
were collected.",
190+
"runtime",
191+
&[("runtime_type", label)],
192+
),
193+
worker_threads: new_gauge(
194+
"tokio_worker_threads",
195+
"The number of worker threads used by the runtime.",
196+
"runtime",
197+
&[("runtime_type", label)],
198+
),
199+
}
200+
}
201+
202+
pub fn update(&mut self, runtime_metrics: &RuntimeMetrics) {
203+
self.scheduled_tasks
204+
.set(runtime_metrics.total_local_queue_depth as i64);
205+
self.worker_busy_duration_milliseconds_total
206+
.inc_by(runtime_metrics.total_busy_duration.as_millis() as u64);
207+
self.worker_busy_ratio.set(runtime_metrics.busy_ratio());
208+
self.worker_threads
209+
.set(runtime_metrics.workers_count as i64);
210+
}
211+
}
212+
138213
#[cfg(test)]
139214
mod tests {
140215
use super::*;

quickwit/quickwit-common/src/thread_pool.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ impl ThreadPool {
5151
}
5252
let thread_pool = rayon_pool_builder
5353
.build()
54-
.expect("failed to spawn the spawning pool");
54+
.expect("failed to spawn thread pool");
5555
let ongoing_tasks = THREAD_POOL_METRICS.ongoing_tasks.with_label_values([name]);
5656
let pending_tasks = THREAD_POOL_METRICS.pending_tasks.with_label_values([name]);
5757
ThreadPool {

0 commit comments

Comments
 (0)