diff --git a/Cargo.lock b/Cargo.lock index c168d8f..c82d8db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3319,9 +3319,9 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.11.13" +version = "0.11.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" +checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" dependencies = [ "aws-lc-rs", "bytes", diff --git a/crates/burn-central-fleet/src/inference.rs b/crates/burn-central-fleet/src/inference.rs index 933c301..1dbfc60 100644 --- a/crates/burn-central-fleet/src/inference.rs +++ b/crates/burn-central-fleet/src/inference.rs @@ -82,14 +82,6 @@ where } fn maybe_sync_and_rollout(&self) -> Result<(), FleetManagedInferenceError> { - let fleet_key = self.current_fleet_key(); - let reconcile_span = tracing::info_span!( - "fleet.inference.reconcile", - fleet_key = fleet_key.as_str(), - inference_name = self.inference_name.as_str(), - ); - let _reconcile_guard = reconcile_span.enter(); - if self.active().is_some() && !self.should_sync_now() { return Ok(()); } @@ -219,7 +211,7 @@ where fn infer(&self, input: Self::Input, writer: InferenceWriter) { let fleet_key = self.current_fleet_key(); let request_span = tracing::info_span!( - "fleet.inference.request", + "fleet.inference", fleet_key = fleet_key.as_str(), inference_name = self.inference_name.as_str(), ); diff --git a/crates/burn-central-fleet/src/model.rs b/crates/burn-central-fleet/src/model.rs index 41793d9..a16bca8 100644 --- a/crates/burn-central-fleet/src/model.rs +++ b/crates/burn-central-fleet/src/model.rs @@ -184,7 +184,6 @@ pub fn load_cached_model_source( } tracing::info!( - root = %models_root.display(), version = model_version_id, "reading model source metadata for model version" ); @@ -193,10 +192,7 @@ pub fn load_cached_model_source( let manifest_path = model_root.join("manifest.json"); if !manifest_path.exists() { - tracing::error!( - path = %manifest_path.display(), - "cached model manifest not found for active model version" - ); + tracing::error!("cached model manifest not found for active model version"); return Err(ModelCacheError::MissingCachedFile( manifest_path.display().to_string(), )); diff --git a/crates/burn-central-fleet/src/telemetry/logs.rs b/crates/burn-central-fleet/src/telemetry/logs.rs index b31b3a2..e3a0721 100644 --- a/crates/burn-central-fleet/src/telemetry/logs.rs +++ b/crates/burn-central-fleet/src/telemetry/logs.rs @@ -1,4 +1,5 @@ use serde::{Deserialize, Serialize}; +use tracing::Dispatch; use tracing::field::{Field, Visit}; use tracing_subscriber::registry::LookupSpan; @@ -60,9 +61,9 @@ impl EventFieldVisitor { self.message = Some(value.clone()); } else if key == "fleet_key" { self.fleet_key = Some(value.clone()); + } else { + self.fields.push(LogField { key, value }); } - - self.fields.push(LogField { key, value }); } } @@ -117,12 +118,49 @@ impl SpanFields { } #[derive(Debug, Default)] -pub struct TelemetryLogLayer; +pub struct TelemetryLogLayer { + with_current_fleet_key: Option Option>, +} + +impl TelemetryLogLayer { + pub(crate) fn current_fleet_key(&self, dispatch: &Dispatch) -> Option { + let current = dispatch.current_span(); + let id = current.id()?; + (self.with_current_fleet_key?)(dispatch, id) + } +} + +/// Retrieves the fleet key from the current span context, if any. Returns None if there is no current span or if no fleet key is associated with the current span. +pub(crate) fn current_fleet_key() -> Option { + tracing::dispatcher::get_default(|dispatch| { + dispatch + .downcast_ref::()? + .current_fleet_key(dispatch) + }) +} impl tracing_subscriber::Layer for TelemetryLogLayer where S: tracing::Subscriber + for<'a> LookupSpan<'a>, { + fn on_layer(&mut self, _: &mut S) { + self.with_current_fleet_key = Some(|dispatch, id| { + let subscriber = dispatch.downcast_ref::()?; + let span = subscriber.span(id)?; + let mut fleet_key = None; + + for scope_span in span.scope().from_root() { + if let Some(span_fields) = scope_span.extensions().get::() { + if let Some(candidate) = span_fields.fleet_key.as_ref() { + fleet_key = Some(candidate.clone()); + } + } + } + + fleet_key + }); + } + fn on_new_span( &self, attrs: &tracing::span::Attributes<'_>, @@ -222,7 +260,7 @@ mod tests { .expect("test serial lock should not be poisoned"); super::super::clear_dispatched_log_records_for_test(); - let subscriber = tracing_subscriber::registry().with(TelemetryLogLayer); + let subscriber = tracing_subscriber::registry().with(TelemetryLogLayer::default()); tracing::subscriber::with_default(subscriber, test_fn); super::super::take_dispatched_log_records_for_test() @@ -285,23 +323,4 @@ mod tests { "events without fleet key should not be dispatched", ); } - - #[test] - fn log_layer_prefers_event_fleet_key_over_span_fleet_key() { - let records = run_with_log_layer(|| { - let span = tracing::info_span!("request", fleet_key = "span-fleet"); - let _guard = span.enter(); - - tracing::info!(fleet_key = "event-fleet", "override fleet key"); - }); - - assert_eq!(records.len(), 1); - let record = &records[0]; - assert_eq!(record.fleet_key, "event-fleet"); - assert_eq!(field_value(record, "fleet_key"), Some("event-fleet")); - assert_eq!( - field_value(record, "span.request.fleet_key"), - Some("span-fleet") - ); - } } diff --git a/crates/burn-central-fleet/src/telemetry/metrics.rs b/crates/burn-central-fleet/src/telemetry/metrics.rs index 609ebf2..5117a11 100644 --- a/crates/burn-central-fleet/src/telemetry/metrics.rs +++ b/crates/burn-central-fleet/src/telemetry/metrics.rs @@ -4,6 +4,10 @@ use std::collections::{BTreeMap, BTreeSet}; use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex}; +use super::logs::current_fleet_key; + +const FLEET_KEY_LABEL: &str = "fleet_key"; + #[derive(Debug)] struct InnerRegistry { registry: metrics_util::registry::Registry, @@ -36,7 +40,7 @@ impl InnerRegistry { .into() } - fn snapshot(&self) -> MetricBatch { + fn snapshot(&self, fleet_key: &str) -> MetricBatch { let mut counters = Vec::new(); let mut gauges = Vec::new(); { @@ -44,6 +48,10 @@ impl InnerRegistry { self.registry.visit_counters(|key, counter| { let key = MetricKey::from_key(key); + if !key.has_label(FLEET_KEY_LABEL, fleet_key) { + return; + } + let value = counter.load(Ordering::Acquire); let previous = delta_store.counter_values.insert(key.clone(), value); let delta = previous.map_or(value, |last_value| value.saturating_sub(last_value)); @@ -59,6 +67,10 @@ impl InnerRegistry { } let key = MetricKey::from_key(key); + if !key.has_label(FLEET_KEY_LABEL, fleet_key) { + return; + } + let previous = delta_store.gauge_values.get(&key).copied(); if previous != Some(value) { gauges.push(MetricGauge { @@ -74,11 +86,15 @@ impl InnerRegistry { let mut histograms = Vec::new(); self.registry.visit_histograms(|key, histogram| { + let key = MetricKey::from_key(key); + if !key.has_label(FLEET_KEY_LABEL, fleet_key) { + return; + } + let mut samples = Vec::new(); histogram.clear_with(|chunk| samples.extend_from_slice(chunk)); - if let Some(summary) = MetricHistogram::from_samples(MetricKey::from_key(key), samples) - { + if let Some(summary) = MetricHistogram::from_samples(key, samples) { histograms.push(summary); } }); @@ -98,6 +114,10 @@ impl InnerRegistry { unit: Option, description: metrics::SharedString, ) { + let Some(fleet_key) = current_fleet_key() else { + return; + }; + let descriptor = MetricDescriptor { name: key.as_str().to_string(), kind, @@ -107,27 +127,45 @@ impl InnerRegistry { let descriptor_key = MetricDescriptorKey::new(descriptor.name.clone(), descriptor.kind); let mut store_guard = self.descriptor_store.lock().unwrap(); - let changed = store_guard.descriptors.get(&descriptor_key) != Some(&descriptor); + let fleet_store = store_guard.by_fleet.entry(fleet_key).or_default(); + let changed = fleet_store.descriptors.get(&descriptor_key) != Some(&descriptor); if changed { - store_guard + fleet_store .descriptors .insert(descriptor_key.clone(), descriptor); - store_guard.dirty.insert(descriptor_key); + fleet_store.dirty.insert(descriptor_key); } } - fn take_descriptor_delta(&self) -> MetricDescriptorBatch { + fn take_descriptor_delta(&self, fleet_key: &str) -> MetricDescriptorBatch { let mut store_guard = self.descriptor_store.lock().unwrap(); - let dirty_keys = store_guard.dirty.iter().cloned().collect::>(); + let Some(fleet_store) = store_guard.by_fleet.get_mut(fleet_key) else { + return MetricDescriptorBatch { + descriptors: Vec::new(), + }; + }; + + let dirty_keys = fleet_store.dirty.iter().cloned().collect::>(); let mut descriptors = Vec::with_capacity(dirty_keys.len()); for key in dirty_keys { - if let Some(descriptor) = store_guard.descriptors.get(&key) { + if let Some(descriptor) = fleet_store.descriptors.get(&key) { descriptors.push(descriptor.clone()); } } - store_guard.dirty.clear(); + fleet_store.dirty.clear(); MetricDescriptorBatch { descriptors } } + + fn remove_descriptor_consumer(&self, fleet_key: &str) { + let mut store_guard = self.descriptor_store.lock().unwrap(); + let Some(fleet_store) = store_guard.by_fleet.get_mut(fleet_key) else { + return; + }; + + fleet_store + .dirty + .extend(fleet_store.descriptors.keys().cloned()); + } } #[derive(Debug, Clone)] @@ -139,13 +177,18 @@ impl RecorderHandle { /// Produces a snapshot of recorded metrics. /// Counters are emitted as positive deltas since the prior snapshot. /// Gauges are emitted only when their value changes. - pub fn snapshot(&self) -> MetricBatch { - self.registry.snapshot() + /// Histograms are emitted as full snapshots of all recorded samples, and are cleared after each snapshot. + pub fn snapshot(&self, fleet_key: &str) -> MetricBatch { + self.registry.snapshot(fleet_key) + } + + /// Drains newly registered or updated metric descriptors for a single fleet. + pub fn take_descriptor_delta(&self, fleet_key: &str) -> MetricDescriptorBatch { + self.registry.take_descriptor_delta(fleet_key) } - /// Drains newly registered or updated metric descriptors since the last call. - pub fn take_descriptor_delta(&self) -> MetricDescriptorBatch { - self.registry.take_descriptor_delta() + pub fn remove_descriptor_consumer(&self, fleet_key: &str) { + self.registry.remove_descriptor_consumer(fleet_key); } } @@ -266,6 +309,11 @@ impl MetricDescriptorKey { #[derive(Debug, Default)] struct DescriptorStore { + by_fleet: BTreeMap, +} + +#[derive(Debug, Default)] +struct FleetDescriptorStore { descriptors: BTreeMap, dirty: BTreeSet, } @@ -304,6 +352,12 @@ impl MetricKey { labels, } } + + fn has_label(&self, key: &str, value: &str) -> bool { + self.labels + .iter() + .any(|label| label.key == key && label.value == value) + } } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -374,6 +428,37 @@ impl MetricHistogram { #[cfg(test)] mod tests { use super::*; + use tracing_subscriber::layer::SubscriberExt; + + const TEST_FLEET_KEY: &str = "fleet-a"; + + fn with_fleet_span(fleet_key: &str, test_fn: impl FnOnce()) { + let subscriber = tracing_subscriber::registry() + .with(crate::telemetry::logs::TelemetryLogLayer::default()); + tracing::subscriber::with_default(subscriber, || { + let span = tracing::info_span!("test.metric_descriptor", fleet_key = fleet_key); + let _guard = span.enter(); + test_fn(); + }); + } + + fn describe_counter_for_fleet( + recorder: &InMemoryMetricsRecorder, + fleet_key: &str, + name: &str, + description: &str, + ) { + let name = name.to_string(); + let description = description.to_string(); + with_fleet_span(fleet_key, || { + metrics::Recorder::describe_counter( + recorder, + name.into(), + Some(metrics::Unit::Count), + description.into(), + ); + }); + } #[test] fn snapshot_collects_counter_gauge_and_histogram() { @@ -381,13 +466,17 @@ mod tests { let handle = recorder.handle(); metrics::with_local_recorder(&recorder, || { - metrics::counter!("fleet.counter", "kind" => "request").increment(3); - metrics::gauge!("fleet.gauge", "kind" => "memory").set(12.5); - metrics::histogram!("fleet.hist", "kind" => "latency").record(2.0); - metrics::histogram!("fleet.hist", "kind" => "latency").record(4.0); + metrics::counter!("fleet.counter", "kind" => "request", "fleet_key" => TEST_FLEET_KEY) + .increment(3); + metrics::gauge!("fleet.gauge", "kind" => "memory", "fleet_key" => TEST_FLEET_KEY) + .set(12.5); + metrics::histogram!("fleet.hist", "kind" => "latency", "fleet_key" => TEST_FLEET_KEY) + .record(2.0); + metrics::histogram!("fleet.hist", "kind" => "latency", "fleet_key" => TEST_FLEET_KEY) + .record(4.0); }); - let batch = handle.snapshot(); + let batch = handle.snapshot(TEST_FLEET_KEY); let counter = batch .counters @@ -419,11 +508,11 @@ mod tests { let handle = recorder.handle(); metrics::with_local_recorder(&recorder, || { - metrics::histogram!("fleet.hist.drain").record(1.0); - metrics::histogram!("fleet.hist.drain").record(3.0); + metrics::histogram!("fleet.hist.drain", "fleet_key" => TEST_FLEET_KEY).record(1.0); + metrics::histogram!("fleet.hist.drain", "fleet_key" => TEST_FLEET_KEY).record(3.0); }); - let first = handle.snapshot(); + let first = handle.snapshot(TEST_FLEET_KEY); let first_hist = first .histograms .iter() @@ -431,7 +520,7 @@ mod tests { .expect("first snapshot should contain histogram"); assert_eq!(first_hist.count, 2); - let second = handle.snapshot(); + let second = handle.snapshot(TEST_FLEET_KEY); let second_hist = second .histograms .iter() @@ -445,12 +534,22 @@ mod tests { let handle = recorder.handle(); metrics::with_local_recorder(&recorder, || { - metrics::counter!("fleet.counter.persist", "kind" => "request").increment(5); - metrics::gauge!("fleet.gauge.persist", "kind" => "memory").set(64.0); + metrics::counter!( + "fleet.counter.persist", + "kind" => "request", + "fleet_key" => TEST_FLEET_KEY + ) + .increment(5); + metrics::gauge!( + "fleet.gauge.persist", + "kind" => "memory", + "fleet_key" => TEST_FLEET_KEY + ) + .set(64.0); }); - let first = handle.snapshot(); - let second = handle.snapshot(); + let first = handle.snapshot(TEST_FLEET_KEY); + let second = handle.snapshot(TEST_FLEET_KEY); let first_counter = first .counters @@ -469,11 +568,21 @@ mod tests { assert!(second.gauges.is_empty()); metrics::with_local_recorder(&recorder, || { - metrics::counter!("fleet.counter.persist", "kind" => "request").increment(2); - metrics::gauge!("fleet.gauge.persist", "kind" => "memory").set(64.0); + metrics::counter!( + "fleet.counter.persist", + "kind" => "request", + "fleet_key" => TEST_FLEET_KEY + ) + .increment(2); + metrics::gauge!( + "fleet.gauge.persist", + "kind" => "memory", + "fleet_key" => TEST_FLEET_KEY + ) + .set(64.0); }); - let third = handle.snapshot(); + let third = handle.snapshot(TEST_FLEET_KEY); let third_counter = third .counters .iter() @@ -483,10 +592,15 @@ mod tests { assert!(third.gauges.is_empty()); metrics::with_local_recorder(&recorder, || { - metrics::gauge!("fleet.gauge.persist", "kind" => "memory").set(96.0); + metrics::gauge!( + "fleet.gauge.persist", + "kind" => "memory", + "fleet_key" => TEST_FLEET_KEY + ) + .set(96.0); }); - let fourth = handle.snapshot(); + let fourth = handle.snapshot(TEST_FLEET_KEY); assert!(fourth.counters.is_empty()); let fourth_gauge = fourth .gauges @@ -501,20 +615,22 @@ mod tests { let recorder = InMemoryMetricsRecorder::new(); let handle = recorder.handle(); - metrics::Recorder::describe_counter( - &recorder, - "fleet.requests.total".into(), - Some(metrics::Unit::Count), - "Total request count".into(), - ); - metrics::Recorder::describe_histogram( - &recorder, - "fleet.request.duration".into(), - Some(metrics::Unit::Milliseconds), - "Request duration".into(), - ); + with_fleet_span(TEST_FLEET_KEY, || { + metrics::Recorder::describe_counter( + &recorder, + "fleet.requests.total".into(), + Some(metrics::Unit::Count), + "Total request count".into(), + ); + metrics::Recorder::describe_histogram( + &recorder, + "fleet.request.duration".into(), + Some(metrics::Unit::Milliseconds), + "Request duration".into(), + ); + }); - let first = handle.take_descriptor_delta(); + let first = handle.take_descriptor_delta(TEST_FLEET_KEY); assert_eq!(first.descriptors.len(), 2); assert!(first.descriptors.iter().any(|descriptor| { descriptor.name == "fleet.requests.total" @@ -529,7 +645,7 @@ mod tests { && descriptor.description == "Request duration" })); - let second = handle.take_descriptor_delta(); + let second = handle.take_descriptor_delta(TEST_FLEET_KEY); assert!(second.descriptors.is_empty()); } @@ -538,13 +654,80 @@ mod tests { let recorder = InMemoryMetricsRecorder::new(); let handle = recorder.handle(); - metrics::Recorder::describe_counter( - &recorder, - "fleet.requests.total".into(), - Some(metrics::Unit::Count), - "Total requests".into(), - ); - let _ = handle.take_descriptor_delta(); + with_fleet_span(TEST_FLEET_KEY, || { + metrics::Recorder::describe_counter( + &recorder, + "fleet.requests.total".into(), + Some(metrics::Unit::Count), + "Total requests".into(), + ); + }); + let _ = handle.take_descriptor_delta(TEST_FLEET_KEY); + + with_fleet_span(TEST_FLEET_KEY, || { + metrics::Recorder::describe_counter( + &recorder, + "fleet.requests.total".into(), + Some(metrics::Unit::Count), + "Total requests".into(), + ); + }); + let unchanged = handle.take_descriptor_delta(TEST_FLEET_KEY); + assert!(unchanged.descriptors.is_empty()); + + with_fleet_span(TEST_FLEET_KEY, || { + metrics::Recorder::describe_counter( + &recorder, + "fleet.requests.total".into(), + Some(metrics::Unit::Count), + "Total requests seen".into(), + ); + }); + let changed = handle.take_descriptor_delta(TEST_FLEET_KEY); + assert_eq!(changed.descriptors.len(), 1); + assert_eq!(changed.descriptors[0].description, "Total requests seen"); + } + + #[test] + fn snapshot_isolated_by_fleet_key() { + let recorder = InMemoryMetricsRecorder::new(); + let handle = recorder.handle(); + + metrics::with_local_recorder(&recorder, || { + metrics::counter!("fleet.counter.multi", "fleet_key" => "fleet-a").increment(3); + metrics::counter!("fleet.counter.multi", "fleet_key" => "fleet-b").increment(7); + }); + + let fleet_a = handle.snapshot("fleet-a"); + let fleet_b = handle.snapshot("fleet-b"); + + assert_eq!(fleet_a.counters.len(), 1); + assert_eq!(fleet_a.counters[0].value, 3); + assert_eq!(fleet_b.counters.len(), 1); + assert_eq!(fleet_b.counters[0].value, 7); + } + + #[test] + fn descriptor_delta_isolated_by_fleet_key() { + let recorder = InMemoryMetricsRecorder::new(); + let handle = recorder.handle(); + + describe_counter_for_fleet(&recorder, "fleet-a", "fleet.requests.a", "Fleet A requests"); + describe_counter_for_fleet(&recorder, "fleet-b", "fleet.requests.b", "Fleet B requests"); + + let fleet_a = handle.take_descriptor_delta("fleet-a"); + let fleet_b = handle.take_descriptor_delta("fleet-b"); + + assert_eq!(fleet_a.descriptors.len(), 1); + assert_eq!(fleet_a.descriptors[0].name, "fleet.requests.a"); + assert_eq!(fleet_b.descriptors.len(), 1); + assert_eq!(fleet_b.descriptors[0].name, "fleet.requests.b"); + } + + #[test] + fn descriptor_delta_ignores_descriptions_without_fleet_context() { + let recorder = InMemoryMetricsRecorder::new(); + let handle = recorder.handle(); metrics::Recorder::describe_counter( &recorder, @@ -552,17 +735,18 @@ mod tests { Some(metrics::Unit::Count), "Total requests".into(), ); - let unchanged = handle.take_descriptor_delta(); - assert!(unchanged.descriptors.is_empty()); - metrics::Recorder::describe_counter( - &recorder, - "fleet.requests.total".into(), - Some(metrics::Unit::Count), - "Total requests seen".into(), + assert!( + handle + .take_descriptor_delta("fleet-a") + .descriptors + .is_empty() + ); + assert!( + handle + .take_descriptor_delta("fleet-b") + .descriptors + .is_empty() ); - let changed = handle.take_descriptor_delta(); - assert_eq!(changed.descriptors.len(), 1); - assert_eq!(changed.descriptors[0].description, "Total requests seen"); } } diff --git a/crates/burn-central-fleet/src/telemetry/mod.rs b/crates/burn-central-fleet/src/telemetry/mod.rs index 710acda..b82e487 100644 --- a/crates/burn-central-fleet/src/telemetry/mod.rs +++ b/crates/burn-central-fleet/src/telemetry/mod.rs @@ -85,7 +85,7 @@ pub fn tracing_log_layer() -> impl tracing_subscriber::Layer where S: tracing::Subscriber + for<'a> LookupSpan<'a>, { - TelemetryLogLayer + TelemetryLogLayer::default() } /// Initializes global telemetry state. diff --git a/crates/burn-central-fleet/src/telemetry/pipeline/collector.rs b/crates/burn-central-fleet/src/telemetry/pipeline/collector.rs index 8c8c19f..1b5b287 100644 --- a/crates/burn-central-fleet/src/telemetry/pipeline/collector.rs +++ b/crates/burn-central-fleet/src/telemetry/pipeline/collector.rs @@ -40,12 +40,16 @@ impl Drop for CollectorHandle { } pub struct MetricsEventCollector { + fleet_key: String, recorder: RecorderHandle, } impl MetricsEventCollector { - pub fn new(recorder: RecorderHandle) -> Self { - Self { recorder } + pub fn new(fleet_key: impl Into, recorder: RecorderHandle) -> Self { + Self { + fleet_key: fleet_key.into(), + recorder, + } } } @@ -53,12 +57,12 @@ impl Collector for MetricsEventCollector { fn collect(&self) -> Result, String> { let mut events = Vec::new(); - let descriptor_delta = self.recorder.take_descriptor_delta(); + let descriptor_delta = self.recorder.take_descriptor_delta(&self.fleet_key); if !descriptor_delta.descriptors.is_empty() { events.push(TelemetryEvent::metric_descriptors(descriptor_delta)); } - let snapshot = self.recorder.snapshot(); + let snapshot = self.recorder.snapshot(&self.fleet_key); if !snapshot.is_empty() { events.push(TelemetryEvent::metrics(snapshot)); } @@ -67,6 +71,12 @@ impl Collector for MetricsEventCollector { } } +impl Drop for MetricsEventCollector { + fn drop(&mut self) { + self.recorder.remove_descriptor_consumer(&self.fleet_key); + } +} + pub struct LogsCollector { ingress: Arc, max_batch_entries: usize, diff --git a/crates/burn-central-fleet/src/telemetry/pipeline/mod.rs b/crates/burn-central-fleet/src/telemetry/pipeline/mod.rs index e6399c5..4ed478f 100644 --- a/crates/burn-central-fleet/src/telemetry/pipeline/mod.rs +++ b/crates/burn-central-fleet/src/telemetry/pipeline/mod.rs @@ -130,7 +130,10 @@ impl TelemetryPipeline { let collector_handles = vec![ collector::start( "telemetry-collector-metrics", - Arc::new(collector::MetricsEventCollector::new(recorder)), + Arc::new(collector::MetricsEventCollector::new( + fleet_key.clone(), + recorder, + )), outbox.clone(), Duration::from_secs(5), ), diff --git a/crates/burn-central-fleet/src/telemetry/pipeline/shipper.rs b/crates/burn-central-fleet/src/telemetry/pipeline/shipper.rs index 367017c..8668593 100644 --- a/crates/burn-central-fleet/src/telemetry/pipeline/shipper.rs +++ b/crates/burn-central-fleet/src/telemetry/pipeline/shipper.rs @@ -167,7 +167,7 @@ pub fn start( Ok(_) | Err(RecvTimeoutError::Disconnected) => break, Err(RecvTimeoutError::Timeout) => {} } - match outbox.claim(10) { + match outbox.claim(10_000) { Ok(None) => {} Ok(Some(items)) => { let (ids, events): (Vec<_>, Vec<_>) = items.into_iter().unzip();