Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 145 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions crates/burn-central-fleet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,16 @@ burn-central-registry.workspace = true
thiserror.workspace = true
serde.workspace = true
serde_json.workspace = true
opentelemetry.workspace = true
sha2.workspace = true
chrono.workspace = true
tracing.workspace = true
directories = "6.0.0"
arc-swap = "1.7.1"
arc-swap = "1.8.2"
once_cell = "1.21.3"

# telemetry
metrics = "0.24.3"
metrics-util = "0.20.1"
tracing-subscriber = "0.3"
metrics-tracing-context = "0.18"
crossbeam-queue = "0.3"
6 changes: 5 additions & 1 deletion crates/burn-central-fleet/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::{model, state};
use crate::{model, state, telemetry};

#[derive(Debug, thiserror::Error)]
pub enum FleetError {
#[error("fleet registration failed: {0}")]
RegistrationFailed(String),
#[error("fleet sync failed: {0}")]
SyncFailed(String),
#[error("fleet model download failed: {0}")]
Expand All @@ -12,4 +14,6 @@ pub enum FleetError {
State(#[from] state::FleetStateStoreError),
#[error(transparent)]
Model(#[from] model::ModelCacheError),
#[error("telemetry pipeline failed: {0}")]
TelemetryInitFailed(#[from] telemetry::TelemetryPipelineError),
}
21 changes: 21 additions & 0 deletions crates/burn-central-fleet/src/inference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ where
}

fn maybe_sync_and_rollout(&self) -> Result<(), FleetManagedInferenceError> {
let fleet_key = self.current_fleet_key();
let reconcile_span = tracing::info_span!(
"fleet.inference.reconcile",
fleet_key = fleet_key.as_str(),
inference_name = self.inference_name.as_str(),
);
let _reconcile_guard = reconcile_span.enter();

if self.active().is_some() && !self.should_sync_now() {
return Ok(());
}
Expand Down Expand Up @@ -194,6 +202,10 @@ where
fn active(&self) -> Option<Arc<ActiveInference<I>>> {
self.active.load_full()
}

fn current_fleet_key(&self) -> String {
self.fleet_session.read().unwrap().fleet_key().to_string()
}
}

impl<B, I> Inference for FleetManagedInference<B, I>
Expand All @@ -205,6 +217,14 @@ where
type Output = <I as Inference>::Output;

fn infer(&self, input: Self::Input, writer: InferenceWriter<Self::Output>) {
let fleet_key = self.current_fleet_key();
let request_span = tracing::info_span!(
"fleet.inference.request",
fleet_key = fleet_key.as_str(),
inference_name = self.inference_name.as_str(),
);
let _request_guard = request_span.enter();

if let Err(err) = self.maybe_sync_and_rollout() {
writer.error(Box::new(err)).ok();
return;
Expand All @@ -221,6 +241,7 @@ where
};

let metadata = InferenceMetadata::new(
fleet_key,
self.inference_name.clone(),
"unknown".to_string(),
active.model_version.clone(),
Expand Down
1 change: 1 addition & 0 deletions crates/burn-central-fleet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub use error::FleetError;
pub use inference::{FleetManagedFactory, FleetManagedInference, FleetManagedInferenceError};
pub use model::ModelSource;
pub use session::FleetDeviceSession;
pub use telemetry::{metrics_recorder, tracing_log_layer, tracing_metrics_layer};

pub type FleetRegistrationToken = String;

Expand Down
Loading
Loading