Skip to content

Commit 00af202

Browse files
committed
feat: implement telemetry authentication token management in FleetDeviceSession and related components
1 parent 6682770 commit 00af202

5 files changed

Lines changed: 149 additions & 35 deletions

File tree

crates/burn-central-fleet/src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ use crate::{model, state, telemetry};
22

33
#[derive(Debug, thiserror::Error)]
44
pub enum FleetError {
5+
#[error("fleet registration failed: {0}")]
6+
RegistrationFailed(String),
57
#[error("fleet sync failed: {0}")]
68
SyncFailed(String),
79
#[error("fleet model download failed: {0}")]

crates/burn-central-fleet/src/session.rs

Lines changed: 72 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use std::{path::PathBuf, sync::Arc};
1+
use std::{
2+
path::PathBuf,
3+
sync::{Arc, RwLock},
4+
};
25

36
use burn_central_client::{Env, FleetClient};
47
use directories::{BaseDirs, ProjectDirs};
@@ -15,6 +18,7 @@ pub struct FleetDeviceSession {
1518
registration_token: FleetRegistrationToken,
1619
identity_key: String,
1720
state: state::FleetState,
21+
telemetry_auth_token: Arc<RwLock<Option<String>>>,
1822
client: FleetClient,
1923
fleet_key: String,
2024
store: state::FleetLocalStateStore,
@@ -40,18 +44,24 @@ impl FleetDeviceSession {
4044
let client = FleetClient::new(env.clone());
4145
let store = state::FleetLocalStateStore::new(root_dir.clone());
4246
let identity_key = store.load_or_create_machine_identity_key()?;
47+
let state = store.load_fleet_state(&fleet_key)?.unwrap_or_default();
48+
let telemetry_auth_token = Arc::new(RwLock::new(
49+
state
50+
.auth_token()
51+
.filter(|auth| auth.is_valid())
52+
.map(|auth| auth.token().to_string()),
53+
));
4354
let telemetry = TelemetryPipeline::get_or_init(
4455
fleet_key.clone(),
45-
registration_token.clone(),
46-
identity_key.clone(),
56+
telemetry_auth_token.clone(),
4757
client.clone(),
4858
root_dir,
4959
)?;
50-
let state = store.load_fleet_state(&fleet_key)?.unwrap_or_default();
5160
let fleet_device = FleetDeviceSession {
5261
registration_token,
5362
identity_key,
5463
state,
64+
telemetry_auth_token,
5565
client,
5666
fleet_key,
5767
store,
@@ -98,18 +108,48 @@ impl FleetDeviceSession {
98108
?metadata,
99109
"syncing fleet device with fleet management service"
100110
);
111+
112+
let should_refresh_token = match self.state.auth_token() {
113+
Some(auth) if auth.is_valid() => {
114+
tracing::debug!(
115+
"using existing auth token with ttl {} seconds",
116+
auth.expires_in_seconds().unwrap()
117+
);
118+
false
119+
}
120+
Some(_) => {
121+
tracing::info!(
122+
"existing auth token expired, requesting a new one from fleet management service"
123+
);
124+
true
125+
}
126+
None => {
127+
tracing::info!(
128+
"no existing auth token, requesting new one from fleet management service"
129+
);
130+
true
131+
}
132+
};
133+
134+
if should_refresh_token {
135+
self.refresh_auth_token(metadata.clone())?;
136+
tracing::info!("successfully refreshed auth token");
137+
}
138+
let auth_token = self
139+
.state
140+
.auth_token()
141+
.ok_or_else(|| FleetError::SyncFailed("missing auth token after refresh".to_string()))?
142+
.token()
143+
.to_string();
144+
101145
let snapshot = self
102146
.client
103-
.sync(
104-
self.registration_token.clone(),
105-
self.identity_key.clone(),
106-
metadata,
107-
)
147+
.sync(&auth_token, metadata)
108148
.map_err(|e| FleetError::SyncFailed(e.to_string()))?;
109149

110150
let download = self
111151
.client
112-
.model_download(self.registration_token.clone(), self.identity_key.clone())
152+
.model_download(&auth_token)
113153
.map_err(|e| FleetError::DownloadFailed(e.to_string()))?;
114154

115155
model::ensure_cached_model(
@@ -129,6 +169,28 @@ impl FleetDeviceSession {
129169
.save_fleet_state(&self.fleet_key, &self.state)
130170
.map_err(FleetError::from)
131171
}
172+
173+
fn refresh_auth_token(&mut self, metadata: Option<DeviceMetadata>) -> Result<(), FleetError> {
174+
let auth_response = self
175+
.client
176+
.register(
177+
self.registration_token.clone(),
178+
self.identity_key.clone(),
179+
metadata,
180+
)
181+
.map_err(|e| FleetError::RegistrationFailed(e.to_string()))?;
182+
183+
self.state
184+
.set_auth_token(auth_response.access_token, auth_response.expires_in_seconds);
185+
186+
let mut telemetry_auth_token = self
187+
.telemetry_auth_token
188+
.write()
189+
.map_err(|_| FleetError::SyncFailed("telemetry auth token lock poisoned".to_string()))?;
190+
*telemetry_auth_token = self.state.auth_token().map(|auth| auth.token().to_string());
191+
192+
Ok(())
193+
}
132194
}
133195

134196
/// Get the default cache directory for a given environment.

crates/burn-central-fleet/src/state.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,48 @@ use serde::de::DeserializeOwned;
77
use serde::{Deserialize, Serialize};
88
use sha2::{Digest, Sha256};
99

10+
#[derive(Debug, Clone, Serialize, Deserialize)]
11+
pub struct AuthToken {
12+
/// The authentication token string to be used in requests to the fleet management service.
13+
token: String,
14+
/// The time-to-live of the authentication token, in seconds.
15+
ttl_seconds: u64,
16+
/// The timestamp of when the authentication token was last updated.
17+
updated_at: String,
18+
}
19+
20+
impl AuthToken {
21+
/// Check if the authentication token is still valid based on its TTL and last updated timestamp.
22+
pub fn is_valid(&self) -> bool {
23+
let updated_at = match chrono::DateTime::parse_from_rfc3339(&self.updated_at) {
24+
Ok(dt) => dt,
25+
Err(_) => return false,
26+
};
27+
28+
let expires_at = updated_at + chrono::Duration::seconds(self.ttl_seconds as i64);
29+
let now = chrono::Utc::now();
30+
now < expires_at
31+
}
32+
33+
/// Calculate the number of seconds until the authentication token expires, or return None if the timestamp is invalid.
34+
pub fn expires_in_seconds(&self) -> Option<i64> {
35+
let updated_at = chrono::DateTime::parse_from_rfc3339(&self.updated_at).ok()?;
36+
let expires_at = updated_at + chrono::Duration::seconds(self.ttl_seconds as i64);
37+
let now = chrono::Utc::now().with_timezone(&updated_at.timezone());
38+
Some((expires_at - now).num_seconds())
39+
}
40+
41+
/// Get a reference to the authentication token string.
42+
pub fn token(&self) -> &str {
43+
&self.token
44+
}
45+
}
46+
1047
/// The state of a device in the fleet management system, as stored on the device and synced with the fleet management service.
1148
#[derive(Serialize, Deserialize, Clone, Debug)]
1249
pub struct FleetState {
50+
/// The current authentication token for communicating with the fleet management service, if any.
51+
auth: Option<AuthToken>,
1352
/// The timestamp of the last update received by the device.
1453
updated_at: String,
1554
/// The id of the model version currently active on the device. Should be updated by the device when a new model version is activated.
@@ -21,6 +60,7 @@ pub struct FleetState {
2160
impl Default for FleetState {
2261
fn default() -> Self {
2362
Self {
63+
auth: None,
2464
updated_at: chrono::Utc::now().to_rfc3339(),
2565
active_model_version_id: String::new(),
2666
runtime_config: serde_json::json!({}),
@@ -29,6 +69,15 @@ impl Default for FleetState {
2969
}
3070

3171
impl FleetState {
72+
pub fn set_auth_token(&mut self, token: String, ttl_seconds: u64) {
73+
let now = chrono::Utc::now().to_rfc3339();
74+
self.auth = Some(AuthToken {
75+
token,
76+
ttl_seconds,
77+
updated_at: now,
78+
});
79+
}
80+
3281
pub fn update(&mut self, model_version_id: String, runtime_config: serde_json::Value) {
3382
self.updated_at = chrono::Utc::now().to_rfc3339();
3483
self.active_model_version_id = model_version_id;
@@ -42,7 +91,12 @@ impl FleetState {
4291
pub fn runtime_config(&self) -> &serde_json::Value {
4392
&self.runtime_config
4493
}
94+
95+
pub fn auth_token(&self) -> Option<&AuthToken> {
96+
self.auth.as_ref()
97+
}
4598
}
99+
46100
#[derive(Serialize, Deserialize, Clone, Debug)]
47101
struct IdentityState {
48102
identity_key: String,
@@ -186,6 +240,11 @@ mod tests {
186240

187241
let fleet_key = fleet_key_from_registration_token("reg-token");
188242
let state = FleetState {
243+
auth: Some(AuthToken {
244+
token: "auth-token".to_string(),
245+
ttl_seconds: 3600,
246+
updated_at: "2026-01-01T00:00:00Z".to_string(),
247+
}),
189248
updated_at: "2026-01-01T00:00:00Z".to_string(),
190249
active_model_version_id: "model-v1".to_string(),
191250
runtime_config: serde_json::json!({ "sample_rate": 0.2 }),

crates/burn-central-fleet/src/telemetry/pipeline/mod.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::{
66
sync::{
77
Arc,
88
atomic::{AtomicUsize, Ordering},
9+
RwLock,
910
},
1011
time::Duration,
1112
};
@@ -80,8 +81,7 @@ pub struct TelemetryPipeline {
8081
impl TelemetryPipeline {
8182
pub fn get_or_init(
8283
fleet_key: String,
83-
registration_token: String,
84-
identity_key: String,
84+
auth_token: Arc<RwLock<Option<String>>>,
8585
client: FleetClient,
8686
root_dir: PathBuf,
8787
) -> Result<Arc<Self>, TelemetryPipelineError> {
@@ -96,8 +96,7 @@ impl TelemetryPipeline {
9696
let recorder = global_recorder_handle();
9797
let pipeline = Arc::new(Self::start(
9898
fleet_key.clone(),
99-
registration_token,
100-
identity_key,
99+
auth_token,
101100
client,
102101
recorder,
103102
root_dir,
@@ -112,8 +111,7 @@ impl TelemetryPipeline {
112111

113112
fn start(
114113
fleet_key: String,
115-
registration_token: String,
116-
identity_key: String,
114+
auth_token: Arc<RwLock<Option<String>>>,
117115
client: FleetClient,
118116
recorder: RecorderHandle,
119117
root_dir: PathBuf,
@@ -147,11 +145,7 @@ impl TelemetryPipeline {
147145

148146
let shipper_handle = shipper::start(
149147
outbox,
150-
Arc::new(shipper::BurnCentralFleetShipperTransport::new(
151-
registration_token,
152-
identity_key,
153-
client,
154-
)),
148+
Arc::new(shipper::BurnCentralFleetShipperTransport::new(auth_token, client)),
155149
Duration::from_secs(5),
156150
);
157151

crates/burn-central-fleet/src/telemetry/pipeline/shipper.rs

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
use std::sync::Arc;
21
use std::sync::mpsc::{RecvTimeoutError, Sender, channel};
2+
use std::sync::{Arc, RwLock};
33
use std::time::Duration;
44

55
use burn_central_client::FleetClient;
@@ -20,22 +20,13 @@ pub trait ShipperTransport: Send + Sync {
2020
}
2121

2222
pub struct BurnCentralFleetShipperTransport {
23-
registration_token: String,
24-
identity_key: String,
23+
auth_token: Arc<RwLock<Option<String>>>,
2524
client: FleetClient,
2625
}
2726

2827
impl BurnCentralFleetShipperTransport {
29-
pub fn new(
30-
registration_token: impl Into<String>,
31-
identity_key: impl Into<String>,
32-
client: FleetClient,
33-
) -> Self {
34-
Self {
35-
registration_token: registration_token.into(),
36-
identity_key: identity_key.into(),
37-
client,
38-
}
28+
pub fn new(auth_token: Arc<RwLock<Option<String>>>, client: FleetClient) -> Self {
29+
Self { auth_token, client }
3930
}
4031
}
4132

@@ -124,9 +115,15 @@ impl ShipperTransport for BurnCentralFleetShipperTransport {
124115
metrics,
125116
logs,
126117
};
118+
let auth_token = self
119+
.auth_token
120+
.read()
121+
.map_err(|_| "telemetry auth token lock poisoned".to_string())?
122+
.clone()
123+
.ok_or_else(|| "missing auth token for telemetry ingestion".to_string())?;
127124

128125
self.client
129-
.ingest_telemetry(&self.registration_token, &self.identity_key, telemetry)
126+
.ingest_telemetry(auth_token, telemetry)
130127
.map_err(|e| format!("failed to send telemetry events to Burn Central Fleet: {e}"))
131128
}
132129
}

0 commit comments

Comments
 (0)