Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions mofa-dora-bridge/examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# mofa-dora-bridge examples

## Runtime backend demo (Task 15 Phase 1)

This example demonstrates runtime backend selection for Studio dataflow lifecycle:

- `dora-cli` (current default behavior)
- `mofa-native` (reserved for Task 15 follow-up implementation)

### Dry run (parsing + backend selection)

```bash
cargo run -p mofa-dora-bridge --example runtime_backend_demo -- apps/mofa-fm/dataflow/voice-chat.yml
```

### Start/stop lifecycle with Dora backend

```bash
MOFA_RUNTIME_BACKEND=dora-cli \
cargo run -p mofa-dora-bridge --example runtime_backend_demo -- apps/mofa-fm/dataflow/voice-chat.yml --start
```

### Verify explicit unsupported behavior for mofa-native (phase 1)

```bash
MOFA_RUNTIME_BACKEND=mofa-native \
cargo run -p mofa-dora-bridge --example runtime_backend_demo -- apps/mofa-fm/dataflow/voice-chat.yml --start
```

Expected output includes an explicit `Unsupported runtime backend` error message.

74 changes: 74 additions & 0 deletions mofa-dora-bridge/examples/runtime_backend_demo.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use mofa_dora_bridge::{DataflowController, RuntimeBackend};
use std::env;
use std::process;

fn parse_backend(value: &str) -> Result<RuntimeBackend, String> {
match value.trim().to_ascii_lowercase().as_str() {
"dora" | "dora-cli" => Ok(RuntimeBackend::DoraCli),
"mofa" | "mofa-native" => Ok(RuntimeBackend::MofaNative),
other => Err(format!(
"unsupported backend '{}', expected one of: dora-cli, mofa-native",
other
)),
}
}

fn main() {
let args: Vec<String> = env::args().collect();
let dataflow_path = args
.get(1)
.cloned()
.unwrap_or_else(|| "apps/mofa-fm/dataflow/voice-chat.yml".to_string());
let backend_str = env::var("MOFA_RUNTIME_BACKEND").unwrap_or_else(|_| "dora-cli".to_string());
let start_requested = args.iter().any(|arg| arg == "--start");

let backend = match parse_backend(&backend_str) {
Ok(v) => v,
Err(e) => {
eprintln!("Invalid backend: {}", e);
process::exit(2);
}
};

println!("Runtime backend: {}", backend.as_str());
println!("Dataflow path: {}", dataflow_path);

let mut controller = match DataflowController::new_with_runtime(&dataflow_path, backend) {
Ok(c) => c,
Err(e) => {
eprintln!("Failed to create DataflowController: {}", e);
process::exit(1);
}
};

if let Some(parsed) = controller.parsed() {
println!(
"Parsed dataflow: nodes={}, mofa_nodes={}, env_requirements={}",
parsed.nodes.len(),
parsed.mofa_nodes.len(),
parsed.env_requirements.len()
);
}

if !start_requested {
println!("Dry run complete. Pass '--start' to execute lifecycle start/stop.");
return;
}

println!("Starting dataflow...");
match controller.start() {
Ok(dataflow_id) => {
println!("Dataflow started: {}", dataflow_id);
println!("Stopping dataflow...");
if let Err(e) = controller.stop() {
eprintln!("Failed to stop dataflow: {}", e);
process::exit(1);
}
println!("Dataflow stopped cleanly.");
}
Err(e) => {
eprintln!("Failed to start dataflow: {}", e);
process::exit(1);
}
}
}
115 changes: 114 additions & 1 deletion mofa-dora-bridge/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,28 @@ impl DataflowState {
}
}

/// Runtime backend used to execute a dataflow.
///
/// `DoraCli` preserves current behavior.
/// `MofaNative` is reserved for Task 15 migration work.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum RuntimeBackend {
/// Execute dataflows via `dora` CLI.
#[default]
DoraCli,
/// Execute dataflows via native mofa-rs runtime (Task 15 target).
MofaNative,
}

impl RuntimeBackend {
pub fn as_str(&self) -> &'static str {
match self {
RuntimeBackend::DoraCli => "dora-cli",
RuntimeBackend::MofaNative => "mofa-native",
}
}
}

/// Controller for managing dataflow lifecycle
pub struct DataflowController {
/// Path to the dataflow YAML file
Expand All @@ -61,11 +83,21 @@ pub struct DataflowController {
env_vars: HashMap<String, String>,
/// Dora daemon process (if we started it)
daemon_process: Option<Child>,
/// Runtime backend to use for lifecycle operations
runtime_backend: RuntimeBackend,
}

impl DataflowController {
/// Create a new controller for a dataflow
pub fn new(dataflow_path: impl AsRef<Path>) -> BridgeResult<Self> {
Self::new_with_runtime(dataflow_path, RuntimeBackend::DoraCli)
}

/// Create a new controller for a dataflow with an explicit runtime backend.
pub fn new_with_runtime(
dataflow_path: impl AsRef<Path>,
runtime_backend: RuntimeBackend,
) -> BridgeResult<Self> {
let original_path = dataflow_path.as_ref();
// Canonicalize to avoid surprises when callers pass relative paths coming
// from different working directories. If canonicalize fails (e.g. missing
Expand All @@ -83,6 +115,7 @@ impl DataflowController {
state: Arc::new(RwLock::new(DataflowState::Stopped)),
env_vars: HashMap::new(),
daemon_process: None,
runtime_backend,
})
}

Expand All @@ -96,6 +129,11 @@ impl DataflowController {
self.state.read().clone()
}

/// Get the configured runtime backend.
pub fn runtime_backend(&self) -> RuntimeBackend {
self.runtime_backend
}

/// Set environment variable for the dataflow
pub fn set_env(&mut self, key: impl Into<String>, value: impl Into<String>) {
self.env_vars.insert(key.into(), value.into());
Expand Down Expand Up @@ -123,6 +161,13 @@ impl DataflowController {

/// Ensure dora daemon is running
pub fn ensure_daemon(&mut self) -> BridgeResult<()> {
if self.runtime_backend != RuntimeBackend::DoraCli {
return Err(BridgeError::UnsupportedRuntime(format!(
"backend '{}' does not use dora daemon",
self.runtime_backend.as_str()
)));
}

// Check if daemon is already running by using `dora list`
// If it succeeds, daemon is running
let status = Command::new("dora")
Expand Down Expand Up @@ -158,6 +203,17 @@ impl DataflowController {

/// Start the dataflow
pub fn start(&mut self) -> BridgeResult<String> {
if self.runtime_backend != RuntimeBackend::DoraCli {
let msg = format!(
"backend '{}' is not wired for start() yet",
self.runtime_backend.as_str()
);
*self.state.write() = DataflowState::Error {
message: msg.clone(),
};
return Err(BridgeError::UnsupportedRuntime(msg));
}

// Check current state
{
let state = self.state.read();
Expand Down Expand Up @@ -202,7 +258,10 @@ impl DataflowController {
}

// Execute
eprintln!("[Controller] Executing: dora start {:?} --detach", self.dataflow_path);
eprintln!(
"[Controller] Executing: dora start {:?} --detach",
self.dataflow_path
);
info!("Starting dataflow: {:?}", self.dataflow_path);
let output = cmd.output().map_err(|e| {
eprintln!("[Controller] FAILED to execute dora: {}", e);
Expand Down Expand Up @@ -268,6 +327,13 @@ impl DataflowController {

/// Stop the dataflow with options
fn stop_with_options(&mut self, grace_duration: Option<Duration>) -> BridgeResult<()> {
if self.runtime_backend != RuntimeBackend::DoraCli {
return Err(BridgeError::UnsupportedRuntime(format!(
"backend '{}' is not wired for stop() yet",
self.runtime_backend.as_str()
)));
}

let dataflow_id = {
let state = self.state.read();
match &*state {
Expand Down Expand Up @@ -315,6 +381,13 @@ impl DataflowController {

/// Get dataflow status
pub fn get_status(&self) -> BridgeResult<DataflowStatus> {
if self.runtime_backend != RuntimeBackend::DoraCli {
return Err(BridgeError::UnsupportedRuntime(format!(
"backend '{}' is not wired for get_status() yet",
self.runtime_backend.as_str()
)));
}

let state = self.state.read().clone();

match state {
Expand Down Expand Up @@ -394,6 +467,46 @@ impl Drop for DataflowController {
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use std::path::PathBuf;

fn write_temp_dataflow() -> PathBuf {
let path =
std::env::temp_dir().join(format!("mofa-studio-test-{}.yml", uuid::Uuid::new_v4()));
fs::write(&path, "nodes: []\n").expect("failed to write temp dataflow");
path
}

#[test]
fn new_with_runtime_sets_backend() {
let path = write_temp_dataflow();
let controller = DataflowController::new_with_runtime(&path, RuntimeBackend::MofaNative)
.expect("controller should be created");
assert_eq!(controller.runtime_backend(), RuntimeBackend::MofaNative);
let _ = fs::remove_file(path);
}

#[test]
fn mofa_native_start_is_explicitly_unsupported_for_now() {
let path = write_temp_dataflow();
let mut controller =
DataflowController::new_with_runtime(&path, RuntimeBackend::MofaNative)
.expect("controller should be created");

let result = controller.start();
match result {
Err(BridgeError::UnsupportedRuntime(msg)) => {
assert!(msg.contains("mofa-native"));
}
other => panic!("expected UnsupportedRuntime error, got: {:?}", other),
}
let _ = fs::remove_file(path);
}
}

/// Dataflow status information
#[derive(Debug, Clone)]
pub struct DataflowStatus {
Expand Down
3 changes: 3 additions & 0 deletions mofa-dora-bridge/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ pub enum BridgeError {
#[error("Timeout: {0}")]
Timeout(String),

#[error("Unsupported runtime backend: {0}")]
UnsupportedRuntime(String),

#[error("Unknown error: {0}")]
Unknown(String),
}
11 changes: 5 additions & 6 deletions mofa-dora-bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,23 +110,22 @@ pub mod data;
pub mod dispatcher;
pub mod error;
pub mod parser;
pub mod remote_config;
pub mod remote_dataflow;
pub mod shared_state;

// Widget-specific bridges
pub mod widgets;

// Re-exports
pub use bridge::{BridgeState, DoraBridge};
pub use controller::{DataflowController, DataflowState};
pub use controller::{DataflowController, DataflowState, RuntimeBackend};
pub use data::{AudioData, ChatMessage, ControlCommand, DoraData, LogEntry};
pub use dispatcher::{DynamicNodeDispatcher, WidgetBinding};
pub use error::{BridgeError, BridgeResult};
pub use shared_state::{SharedDoraState, DoraStatus, ChatState, AudioState, DirtyVec, DirtyValue, MicState};
pub use widgets::AecControlCommand;
pub use parser::{DataflowParser, EnvRequirement, LogSource, ParsedDataflow, ParsedNode};
pub use remote_config::RemoteConfig;
pub use shared_state::{
AudioState, ChatState, DirtyValue, DirtyVec, DoraStatus, MicState, SharedDoraState,
};
pub use widgets::AecControlCommand;

/// Prefix for MoFA built-in dynamic nodes in dataflow YAML
pub const MOFA_NODE_PREFIX: &str = "mofa-";
Expand Down