diff --git a/Cargo.toml b/Cargo.toml index 04e3f381..328359c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "mofa-dora-bridge", "mofa-ui", "apps/*", + "libs/mofa-metrics", ] exclude = ["apps/mofa-asr"] diff --git a/libs/mofa-metrics/Cargo.toml b/libs/mofa-metrics/Cargo.toml new file mode 100644 index 00000000..e8e2cad7 --- /dev/null +++ b/libs/mofa-metrics/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "mofa-metrics" +version.workspace = true +edition.workspace = true +license.workspace = true +description = "Thread-safe metrics store and HTTP/WebSocket API for MoFA Studio Observatory" + +[dependencies] +serde = { workspace = true } +serde_json = { workspace = true } +parking_lot = { workspace = true } +tokio = { workspace = true } +log = { workspace = true } +uuid = { workspace = true } +axum = { version = "0.7", features = ["ws"] } +tower-http = { version = "0.5", features = ["cors", "timeout"] } +futures = "0.3" + +[dev-dependencies] +tokio = { workspace = true, features = ["full", "macros", "rt-multi-thread"] } diff --git a/libs/mofa-metrics/src/lib.rs b/libs/mofa-metrics/src/lib.rs new file mode 100644 index 00000000..f7289c75 --- /dev/null +++ b/libs/mofa-metrics/src/lib.rs @@ -0,0 +1,9 @@ +// Metrics store and HTTP/WS API for MoFA Studio Observatory. + +pub mod server; +pub mod store; +pub mod types; + +pub use server::ServerConfig; +pub use store::MetricsStore; +pub use types::*; diff --git a/libs/mofa-metrics/src/server.rs b/libs/mofa-metrics/src/server.rs new file mode 100644 index 00000000..c77b494c --- /dev/null +++ b/libs/mofa-metrics/src/server.rs @@ -0,0 +1,113 @@ +// HTTP + WebSocket server for Observatory metrics API. + +use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade}; +use axum::extract::{Path, State}; +use axum::http::StatusCode; +use axum::response::IntoResponse; +use axum::routing::get; +use axum::{Json, Router}; +use futures::{sink::SinkExt, stream::StreamExt}; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; +use tokio::net::TcpListener; +use tokio::sync::broadcast; +use tower_http::cors::CorsLayer; +use tower_http::timeout::TimeoutLayer; + +use crate::{AgentStatus, MetricsStore, PipelineTrace, SystemSnapshot}; + +/// Server config. +pub struct ServerConfig { + pub port: u16, +} + +/// Start metrics server as a background task. +pub fn start(store: Arc, cfg: ServerConfig) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + let (tx, _) = broadcast::channel::(256); + store.init_broadcast(tx); + + let app = Router::new() + .route("/api/agents", get(h_list_agents)) + .route("/api/agents/{id}", get(h_get_agent)) + .route("/api/metrics", get(h_get_metrics)) + .route("/api/pipeline", get(h_get_pipeline)) + .route("/ws", get(h_ws)) + .layer(CorsLayer::permissive()) + .layer(TimeoutLayer::new(Duration::from_secs(30))) + .with_state(store); + + let addr = SocketAddr::from(([0, 0, 0, 0], cfg.port)); + let listener = match TcpListener::bind(addr).await { + Ok(l) => l, + Err(e) => { + log::error!("Failed to bind metrics server on port {}: {}", cfg.port, e); + return; + } + }; + log::info!( + "Observatory HTTP server listening on http://localhost:{}", + cfg.port + ); + axum::serve(listener, app).await.ok(); + }) +} + +// REST handlers +async fn h_list_agents(State(s): State>) -> Json> { + Json(s.get_agents()) +} + +async fn h_get_agent( + State(s): State>, + Path(id): Path, +) -> Result, StatusCode> { + s.get_agents() + .into_iter() + .find(|a| a.id == id) + .map(Json) + .ok_or(StatusCode::NOT_FOUND) +} + +async fn h_get_metrics(State(s): State>) -> Json> { + Json(s.get_latest_snapshot()) +} + +async fn h_get_pipeline(State(s): State>) -> Json> { + Json(s.get_traces(50)) +} + +// WebSocket +async fn h_ws(ws: WebSocketUpgrade, State(s): State>) -> impl IntoResponse { + ws.on_upgrade(|socket| ws_handler(socket, s)) +} + +async fn ws_handler(socket: WebSocket, store: Arc) { + let Some(mut rx) = store.subscribe() else { + return; + }; + let (mut sender, mut receiver): ( + futures::stream::SplitSink, + futures::stream::SplitStream, + ) = socket.split(); + + loop { + tokio::select! { + msg = rx.recv() => { + match msg { + Ok(json) => { + let text_msg: Message = Message::Text(json.into()); + if sender.send(text_msg).await.is_err() { + break; + } + } + Err(_) => break, + } + } + incoming = receiver.next() => { + if incoming.is_none() { break; } + } + } + } +} diff --git a/libs/mofa-metrics/src/store.rs b/libs/mofa-metrics/src/store.rs new file mode 100644 index 00000000..90e75ce2 --- /dev/null +++ b/libs/mofa-metrics/src/store.rs @@ -0,0 +1,212 @@ +// Thread-safe metrics store with dirty tracking. + +use crate::{AgentStatus, MetricEvent, PipelineTrace, SystemSnapshot}; +use parking_lot::RwLock; +use std::collections::{HashMap, VecDeque}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use tokio::sync::broadcast; + +pub struct MetricsStore { + agents: RwLock>, + traces: RwLock>, + snapshots: RwLock>, + + agents_dirty: AtomicBool, + traces_dirty: AtomicBool, + snapshots_dirty: AtomicBool, + + broadcast_tx: RwLock>>, +} + +// Ring buffer caps +const MAX_TRACES: usize = 200; +const MAX_SNAPSHOTS: usize = 300; + +impl MetricsStore { + pub fn new() -> Arc { + Arc::new(Self { + agents: RwLock::new(HashMap::new()), + traces: RwLock::new(VecDeque::new()), + snapshots: RwLock::new(VecDeque::new()), + agents_dirty: AtomicBool::new(false), + traces_dirty: AtomicBool::new(false), + snapshots_dirty: AtomicBool::new(false), + broadcast_tx: RwLock::new(None), + }) + } + + pub fn init_broadcast(&self, tx: broadcast::Sender) { + *self.broadcast_tx.write() = Some(tx); + } + + pub fn subscribe(&self) -> Option> { + self.broadcast_tx.read().as_ref().map(|tx| tx.subscribe()) + } + + fn broadcast_event(&self, event: &MetricEvent) { + if let Some(ref tx) = *self.broadcast_tx.read() { + if let Ok(json) = serde_json::to_string(event) { + let _ = tx.send(json); + } + } + } + + pub fn record_agent(&self, status: AgentStatus) { + self.broadcast_event(&MetricEvent::AgentStatusChanged(status.clone())); + self.agents.write().insert(status.id.clone(), status); + self.agents_dirty.store(true, Ordering::Release); + } + + pub fn push_trace(&self, trace: PipelineTrace) { + self.broadcast_event(&MetricEvent::PipelineCompleted(trace.clone())); + let mut buf = self.traces.write(); + if buf.len() >= MAX_TRACES { + buf.pop_front(); + } + buf.push_back(trace); + self.traces_dirty.store(true, Ordering::Release); + } + + pub fn push_snapshot(&self, snap: SystemSnapshot) { + self.broadcast_event(&MetricEvent::SystemSnapshot(snap.clone())); + let mut buf = self.snapshots.write(); + if buf.len() >= MAX_SNAPSHOTS { + buf.pop_front(); + } + buf.push_back(snap); + self.snapshots_dirty.store(true, Ordering::Release); + } + + pub fn take_agents_if_dirty(&self) -> Option> { + if self.agents_dirty.swap(false, Ordering::AcqRel) { + Some(self.agents.read().values().cloned().collect()) + } else { + None + } + } + + pub fn take_traces_if_dirty(&self) -> Option> { + if self.traces_dirty.swap(false, Ordering::AcqRel) { + Some(self.traces.read().iter().cloned().collect()) + } else { + None + } + } + + pub fn take_snapshots_if_dirty(&self) -> Option> { + if self.snapshots_dirty.swap(false, Ordering::AcqRel) { + Some(self.snapshots.read().iter().cloned().collect()) + } else { + None + } + } + + pub fn get_agents(&self) -> Vec { + self.agents.read().values().cloned().collect() + } + + pub fn get_latest_snapshot(&self) -> Option { + self.snapshots.read().back().cloned() + } + + pub fn get_traces(&self, n: usize) -> Vec { + let buf = self.traces.read(); + buf.iter().rev().take(n).rev().cloned().collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::types::{unix_ms, AgentState}; + + fn sample_agent(id: &str, state: AgentState) -> AgentStatus { + AgentStatus { + id: id.into(), + name: id.into(), + status: state, + last_seen_ms: unix_ms(), + } + } + + fn sample_snapshot() -> SystemSnapshot { + SystemSnapshot { + cpu_pct: 0.45, + memory_pct: 0.60, + gpu_pct: 0.0, + pipeline_latency_p50_ms: 120.0, + uptime_secs: 30, + timestamp_ms: unix_ms(), + } + } + + #[test] + fn dirty_flag_cleared_on_read() { + let store = MetricsStore::new(); + assert!(store.take_agents_if_dirty().is_none()); + + store.record_agent(sample_agent("n1", AgentState::Active)); + let agents = store.take_agents_if_dirty().unwrap(); + assert_eq!(agents.len(), 1); + + // Not dirty anymore + assert!(store.take_agents_if_dirty().is_none()); + } + + #[test] + fn agent_upsert() { + let store = MetricsStore::new(); + store.record_agent(sample_agent("n1", AgentState::Idle)); + store.record_agent(sample_agent("n1", AgentState::Active)); + + let agents = store.get_agents(); + assert_eq!(agents.len(), 1); + assert_eq!(agents[0].status, AgentState::Active); + } + + #[test] + fn trace_ring_buffer_cap() { + let store = MetricsStore::new(); + for i in 0..(MAX_TRACES + 50) { + store.push_trace(PipelineTrace { + asr_start_ms: i as u64, + asr_end_ms: i as u64 + 10, + llm_start_ms: 0, + llm_end_ms: 0, + tts_start_ms: 0, + tts_end_ms: 0, + tokens_per_sec: 0.0, + }); + } + let traces = store.get_traces(999); + assert_eq!(traces.len(), MAX_TRACES); + // Oldest evicted + assert_eq!(traces[0].asr_start_ms, 50); + } + + #[test] + fn snapshot_ring_buffer_cap() { + let store = MetricsStore::new(); + for _ in 0..(MAX_SNAPSHOTS + 10) { + store.push_snapshot(sample_snapshot()); + } + let snaps = store.take_snapshots_if_dirty().unwrap(); + assert_eq!(snaps.len(), MAX_SNAPSHOTS); + } + + #[test] + fn get_latest_snapshot_returns_newest() { + let store = MetricsStore::new(); + store.push_snapshot(SystemSnapshot { + cpu_pct: 0.1, + ..sample_snapshot() + }); + store.push_snapshot(SystemSnapshot { + cpu_pct: 0.9, + ..sample_snapshot() + }); + let latest = store.get_latest_snapshot().unwrap(); + assert!((latest.cpu_pct - 0.9).abs() < f64::EPSILON); + } +} diff --git a/libs/mofa-metrics/src/types.rs b/libs/mofa-metrics/src/types.rs new file mode 100644 index 00000000..12266426 --- /dev/null +++ b/libs/mofa-metrics/src/types.rs @@ -0,0 +1,74 @@ +// Core metric types for the Observatory dashboard. + +use serde::{Deserialize, Serialize}; +use std::time::{SystemTime, UNIX_EPOCH}; + +// Current Unix time in ms +pub fn unix_ms() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64 +} + +// Node lifecycle state +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum AgentState { + Offline, + Idle, + Active, + Error, +} + +impl Default for AgentState { + fn default() -> Self { + Self::Offline + } +} + +// Single node status +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AgentStatus { + pub id: String, + pub name: String, + pub status: AgentState, + pub last_seen_ms: u64, +} + +// Per stage timestamps for one ASR, LLM, TTS pipeline run +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PipelineTrace { + pub asr_start_ms: u64, + pub asr_end_ms: u64, + pub llm_start_ms: u64, + pub llm_end_ms: u64, + pub tts_start_ms: u64, + pub tts_end_ms: u64, + // LLM tokens/sec (0.0 if unknown) + pub tokens_per_sec: f64, +} + +// System metrics snapshot normalized to 0 to 1 +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SystemSnapshot { + // CPU + pub cpu_pct: f64, + // Memory + pub memory_pct: f64, + // GPU + pub gpu_pct: f64, + pub pipeline_latency_p50_ms: f64, + // Uptime + pub uptime_secs: u64, + pub timestamp_ms: u64, +} + +// WebSocket broadcast event +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", content = "data")] +pub enum MetricEvent { + AgentStatusChanged(AgentStatus), + PipelineCompleted(PipelineTrace), + SystemSnapshot(SystemSnapshot), +}