diff --git a/crates/wassette/src/lib.rs b/crates/wassette/src/lib.rs index 8a9b24e3..972fe490 100644 --- a/crates/wassette/src/lib.rs +++ b/crates/wassette/src/lib.rs @@ -21,9 +21,9 @@ use etcetera::BaseStrategy; use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio::fs::DirEntry; -use tokio::sync::{RwLock, Semaphore}; +use tokio::sync::{Mutex, RwLock, Semaphore}; use tracing::{debug, info, instrument, warn}; -use wasmtime::component::{Component, InstancePre}; +use wasmtime::component::{Component, Instance, InstancePre}; use wasmtime::Store; mod component_storage; @@ -149,6 +149,22 @@ pub struct ComponentLoadOutcome { pub tool_names: Vec, } +/// Options for loading a component. +#[derive(Debug, Clone, Default)] +pub struct LoadOptions { + /// When true, the component's Store and Instance persist across tool calls, + /// enabling in-memory state and WASI resource continuity. + /// Concurrent calls to stateful components are serialized. + pub stateful: bool, +} + +/// A persistent Store/Instance pair for stateful components. +/// The mutex ensures concurrent calls are serialized. +struct StatefulInstance { + store: Store>, + instance: Instance, +} + impl ComponentRegistry { fn new() -> Self { Self::default() @@ -304,6 +320,11 @@ pub struct LifecycleManager { oci_client: Arc, http_client: reqwest::Client, secrets_manager: Arc, + /// Cached Store/Instance pairs for stateful components. + /// The outer Arc> serializes concurrent calls to the same component. + stateful_instances: Arc>>, + /// Tracks which components are loaded in stateful mode. + stateful_components: Arc>>, } /// A representation of a loaded component instance. It contains both the base component info and a @@ -370,6 +391,8 @@ impl LifecycleManager { oci_client, http_client, secrets_manager, + stateful_instances: Arc::new(Mutex::new(HashMap::new())), + stateful_components: Arc::new(RwLock::new(std::collections::HashSet::new())), }) } @@ -524,9 +547,32 @@ impl LifecycleManager { /// If a component with the given id already exists, it will be updated with the new component. /// Returns rich [`ComponentLoadOutcome`] information describing the loaded /// component and whether it replaced an existing instance. + /// + /// This is a convenience method that calls [`load_component_with_options`] with default options. #[instrument(skip(self))] pub async fn load_component(&self, uri: &str) -> Result { - debug!(uri, "Loading component"); + self.load_component_with_options(uri, LoadOptions::default()) + .await + } + + /// Loads a new component from the given URI with explicit options. + /// + /// If a component with the given id already exists, it will be updated with the new component. + /// Returns rich [`ComponentLoadOutcome`] information describing the loaded + /// component and whether it replaced an existing instance. + /// + /// # Options + /// + /// - `stateful`: When true, the component's Store and Instance persist across tool calls, + /// enabling in-memory state and WASI resource continuity. Concurrent calls to stateful + /// components are serialized. + #[instrument(skip(self))] + pub async fn load_component_with_options( + &self, + uri: &str, + options: LoadOptions, + ) -> Result { + debug!(uri, stateful = options.stateful, "Loading component"); let (component_id, resource) = self.resolve_component_resource(uri).await?; let staged_path = self .stage_component_artifact(&component_id, resource) @@ -541,10 +587,38 @@ impl LifecycleManager { ) })?; + // Clear any existing stateful instance on reload (state will be re-created on first call) + if outcome.status == LoadResult::Replaced { + let was_stateful = self + .stateful_components + .write() + .await + .remove(&outcome.component_id); + if was_stateful { + self.stateful_instances + .lock() + .await + .remove(&outcome.component_id); + debug!( + component_id = %outcome.component_id, + "Cleared previous stateful instance on reload" + ); + } + } + + // Track stateful components + if options.stateful { + self.stateful_components + .write() + .await + .insert(outcome.component_id.clone()); + } + info!( component_id = %outcome.component_id, status = ?outcome.status, tools = ?outcome.tool_names, + stateful = options.stateful, "Successfully loaded component" ); Ok(outcome) @@ -553,6 +627,8 @@ impl LifecycleManager { /// Unloads the component with the specified id. This removes the component from the runtime /// and removes all associated files from disk, making it the reverse operation of load_component. /// This function fails if any files cannot be removed (except when they don't exist). + /// + /// For stateful components, this also drops the cached Store/Instance, losing all in-memory state. #[instrument(skip(self))] pub async fn unload_component(&self, id: &str) -> Result<()> { debug!("Unloading component and removing files from disk"); @@ -574,6 +650,13 @@ impl LifecycleManager { self.registry.remove_component(id).await; self.policy_manager.cleanup(id).await; + // Clean up stateful instance if present + let was_stateful = self.stateful_components.write().await.remove(id); + if was_stateful { + self.stateful_instances.lock().await.remove(id); + debug!(component_id = %id, "Stateful instance dropped"); + } + info!(component_id = %id, "Component unloaded successfully"); Ok(()) } @@ -965,13 +1048,35 @@ impl LifecycleManager { component_id: &str, function_name: &str, parameters: &str, + ) -> Result { + let is_stateful = self + .stateful_components + .read() + .await + .contains(component_id); + + if is_stateful { + self.execute_stateful_component_call(component_id, function_name, parameters) + .await + } else { + self.execute_stateless_component_call(component_id, function_name, parameters) + .await + } + } + + /// Executes a function call on a stateless component (fresh Store/Instance per call) + async fn execute_stateless_component_call( + &self, + component_id: &str, + function_name: &str, + parameters: &str, ) -> Result { let start_time = Instant::now(); debug!( component_id = %component_id, function_name = %function_name, - "Starting WebAssembly component execution" + "Starting stateless WebAssembly component execution" ); let component = self @@ -1006,6 +1111,124 @@ impl LifecycleManager { "Component instance created" ); + let result = self + .execute_function_on_instance( + component_id, + function_name, + parameters, + &mut store, + &instance, + ) + .await?; + + let total_duration = start_time.elapsed(); + + debug!( + component_id = %component_id, + function_name = %function_name, + total_duration_ms = %total_duration.as_millis(), + instantiation_ms = %instantiation_duration.as_millis(), + "Stateless WebAssembly component execution completed" + ); + + Ok(result) + } + + /// Executes a function call on a stateful component (persistent Store/Instance) + /// Concurrent calls to the same stateful component are serialized via mutex. + async fn execute_stateful_component_call( + &self, + component_id: &str, + function_name: &str, + parameters: &str, + ) -> Result { + let start_time = Instant::now(); + + debug!( + component_id = %component_id, + function_name = %function_name, + "Starting stateful WebAssembly component execution" + ); + + // Acquire lock on stateful instances - this serializes concurrent calls + let mut stateful_instances = self.stateful_instances.lock().await; + + let component = self + .get_component(component_id) + .await + .ok_or_else(|| anyhow!("Component not found: {}", component_id))?; + + // Get or create the stateful instance + let is_first_call = !stateful_instances.contains_key(component_id); + + if is_first_call { + let (state, resource_limiter) = + self.get_wasi_state_for_component(component_id).await?; + + let mut store = Store::new(self.runtime.as_ref(), state); + + if resource_limiter.is_some() { + store.limiter(|state: &mut WassetteWasiState| { + state + .inner + .resource_limiter + .as_mut() + .expect("Resource limiter should be present - checked above") + }); + } + + let instantiation_start = Instant::now(); + let instance = component.instance_pre.instantiate_async(&mut store).await?; + let instantiation_duration = instantiation_start.elapsed(); + + debug!( + component_id = %component_id, + instantiation_ms = %instantiation_duration.as_millis(), + "Stateful component instance created (first call)" + ); + + stateful_instances.insert( + component_id.to_string(), + StatefulInstance { store, instance }, + ); + } + + let stateful_instance = stateful_instances + .get_mut(component_id) + .expect("StatefulInstance should exist - just created or already present"); + + let result = self + .execute_function_on_instance( + component_id, + function_name, + parameters, + &mut stateful_instance.store, + &stateful_instance.instance, + ) + .await?; + + let total_duration = start_time.elapsed(); + + debug!( + component_id = %component_id, + function_name = %function_name, + total_duration_ms = %total_duration.as_millis(), + first_call = is_first_call, + "Stateful WebAssembly component execution completed" + ); + + Ok(result) + } + + /// Execute a function on an existing Store/Instance pair + async fn execute_function_on_instance( + &self, + component_id: &str, + function_name: &str, + parameters: &str, + store: &mut Store>, + instance: &Instance, + ) -> Result { // Use the new function identifier lookup instead of dot-splitting let function_id = self .registry @@ -1020,11 +1243,11 @@ impl LifecycleManager { let func = if !interface_name.is_empty() { let interface_index = instance - .get_export_index(&mut store, None, interface_name) + .get_export_index(&mut *store, None, interface_name) .ok_or_else(|| anyhow!("Interface not found: {}", interface_name))?; let function_index = instance - .get_export_index(&mut store, Some(&interface_index), func_name) + .get_export_index(&mut *store, Some(&interface_index), func_name) .ok_or_else(|| { anyhow!( "Function not found in interface: {}.{}", @@ -1034,7 +1257,7 @@ impl LifecycleManager { })?; instance - .get_func(&mut store, function_index) + .get_func(&mut *store, function_index) .ok_or_else(|| { anyhow!( "Function not found in interface: {}.{}", @@ -1044,27 +1267,34 @@ impl LifecycleManager { })? } else { let func_index = instance - .get_export_index(&mut store, None, func_name) + .get_export_index(&mut *store, None, func_name) .ok_or_else(|| anyhow!("Function not found: {}", func_name))?; instance - .get_func(&mut store, func_index) + .get_func(&mut *store, func_index) .ok_or_else(|| anyhow!("Function not found: {}", func_name))? }; let params: serde_json::Value = serde_json::from_str(parameters)?; - let argument_vals = json_to_vals(¶ms, &func.params(&store))?; + let argument_vals = json_to_vals(¶ms, &func.params(&mut *store))?; - let mut results = create_placeholder_results(&func.results(&store)); + let mut results = create_placeholder_results(&func.results(&mut *store)); let execution_start = Instant::now(); // Execute the WASM function and capture any errors let call_result = func - .call_async(&mut store, &argument_vals, &mut results) + .call_async(&mut *store, &argument_vals, &mut results) .await; let execution_duration = execution_start.elapsed(); + debug!( + component_id = %component_id, + function_name = %function_name, + execution_ms = %execution_duration.as_millis(), + "Function execution completed" + ); + // If the call failed, check if it was due to a permission denial if let Err(e) = call_result { // Check if there was a permission error recorded during execution @@ -1078,17 +1308,6 @@ impl LifecycleManager { let result_json = vals_to_json(&results); - let total_duration = start_time.elapsed(); - - debug!( - component_id = %component_id, - function_name = %function_name, - total_duration_ms = %total_duration.as_millis(), - instantiation_ms = %instantiation_duration.as_millis(), - execution_ms = %execution_duration.as_millis(), - "WebAssembly component execution completed" - ); - if let Some(result_str) = result_json.as_str() { Ok(result_str.to_string()) } else { diff --git a/docs/concepts.md b/docs/concepts.md index 23b3ac6d..dd3d43ce 100644 --- a/docs/concepts.md +++ b/docs/concepts.md @@ -79,6 +79,41 @@ graph LR When you load a component in Wassette, the system first loads the WebAssembly component using the Wasmtime runtime, then examines the component's WIT interface to discover exported functions. Each function's parameters and return types are converted to JSON Schema, and each function becomes an MCP tool with a name, description, and parameter schema. When an AI agent calls a tool, Wassette executes the corresponding function in the sandboxed Wasm environment. +### Stateful vs Stateless Components + +By default, Wassette creates a fresh WebAssembly Store and Instance for each tool call. This **stateless mode** ensures complete isolation between calls but means components cannot maintain in-memory state. + +For components that need to preserve state across calls (caches, counters, open file handles, database connections), Wassette supports **stateful mode**: + +```rust +// Load a component in stateful mode +manager.load_component_with_options( + "oci://example/memory-component", + LoadOptions { stateful: true } +).await?; +``` + +**Key characteristics of stateful mode:** + +- **Persistent Store/Instance**: The WebAssembly Store and Instance persist across tool calls +- **In-memory state**: Component globals, heap allocations, and static data survive between calls +- **Resource continuity**: WASI resources (file handles, sockets) remain valid across calls +- **Serialized execution**: Concurrent calls to the same stateful component are queued to prevent race conditions +- **Process-scoped lifetime**: State persists until the component is unloaded or the process exits + +**When to use stateful mode:** + +- Knowledge graph or memory components that accumulate data +- Components with expensive initialization (loading models, parsing configs) +- Components managing persistent connections or sessions +- Caching layers that benefit from warm state + +**When to use stateless mode (default):** + +- Stateless utility functions (formatting, calculations) +- Components where isolation between calls is important +- When you want predictable, reproducible behavior + ### Example Flow ```mermaid diff --git a/tests/stateful_component_test.rs b/tests/stateful_component_test.rs new file mode 100644 index 00000000..2330f74b --- /dev/null +++ b/tests/stateful_component_test.rs @@ -0,0 +1,223 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +//! Integration tests for stateful component mode. +//! +//! These tests verify that components loaded with `stateful: true` maintain +//! their Store/Instance across multiple tool calls. + +use std::sync::Arc; + +use anyhow::{Context, Result}; +use tempfile::TempDir; +use test_log::test; +use wassette::{LifecycleManager, LoadOptions}; + +mod common; +use common::build_filesystem_component; + +async fn setup_lifecycle_manager() -> Result<(Arc, TempDir)> { + let tempdir = tempfile::tempdir()?; + + let manager = Arc::new( + LifecycleManager::new_unloaded(&tempdir) + .await + .context("Failed to create LifecycleManager")?, + ); + + Ok((manager, tempdir)) +} + +#[test(tokio::test)] +async fn test_load_options_default() { + let options = LoadOptions::default(); + assert!(!options.stateful, "Default LoadOptions should be stateless"); +} + +#[test(tokio::test)] +async fn test_load_options_stateful() { + let options = LoadOptions { stateful: true }; + assert!(options.stateful, "Stateful option should be true"); +} + +#[cfg(any(target_os = "linux", target_os = "macos"))] +#[test(tokio::test)] +async fn test_load_component_with_stateful_option() -> Result<()> { + let (manager, _tempdir) = setup_lifecycle_manager().await?; + + let component_path = build_filesystem_component().await?; + let uri = format!("file://{}", component_path.to_str().unwrap()); + + // Load with stateful: true + let outcome = manager + .load_component_with_options(&uri, LoadOptions { stateful: true }) + .await?; + + assert_eq!(outcome.component_id, "filesystem"); + + // Verify the component is tracked as stateful by checking that + // execute_component_call works (which internally checks stateful status) + let result = manager + .execute_component_call( + &outcome.component_id, + "list-directory", + r#"{"path": "/tmp"}"#, + ) + .await; + + // The call should work (may fail due to permissions, but that's expected) + // The important thing is that it doesn't panic due to stateful handling + match result { + Ok(_) => {} + Err(e) => { + // Permission errors are expected without granting permissions + let error_msg = e.to_string(); + assert!( + error_msg.contains("Failed to read directory") + || error_msg.contains("permission") + || error_msg.contains("denied") + || error_msg.contains("storage"), + "Unexpected error: {}", + error_msg + ); + } + } + + Ok(()) +} + +#[cfg(any(target_os = "linux", target_os = "macos"))] +#[test(tokio::test)] +async fn test_stateful_component_multiple_calls() -> Result<()> { + let (manager, _tempdir) = setup_lifecycle_manager().await?; + + let component_path = build_filesystem_component().await?; + let uri = format!("file://{}", component_path.to_str().unwrap()); + + // Load with stateful: true + let outcome = manager + .load_component_with_options(&uri, LoadOptions { stateful: true }) + .await?; + + // Make multiple calls - these should reuse the same Store/Instance + for _ in 0..3 { + let _ = manager + .execute_component_call( + &outcome.component_id, + "list-directory", + r#"{"path": "/tmp"}"#, + ) + .await; + } + + // If we got here without panicking, the stateful execution path works + Ok(()) +} + +#[cfg(any(target_os = "linux", target_os = "macos"))] +#[test(tokio::test)] +async fn test_stateful_component_unload_clears_state() -> Result<()> { + let (manager, _tempdir) = setup_lifecycle_manager().await?; + + let component_path = build_filesystem_component().await?; + let uri = format!("file://{}", component_path.to_str().unwrap()); + + // Load with stateful: true + let outcome = manager + .load_component_with_options(&uri, LoadOptions { stateful: true }) + .await?; + + // Make a call to create the stateful instance + let _ = manager + .execute_component_call( + &outcome.component_id, + "list-directory", + r#"{"path": "/tmp"}"#, + ) + .await; + + // Unload should succeed and clear stateful state + manager.unload_component(&outcome.component_id).await?; + + // Verify component is unloaded + let components = manager.list_components().await; + assert!( + components.is_empty(), + "Component should be unloaded, but found: {:?}", + components + ); + + Ok(()) +} + +#[cfg(any(target_os = "linux", target_os = "macos"))] +#[test(tokio::test)] +async fn test_stateful_component_reload_clears_previous_state() -> Result<()> { + let (manager, _tempdir) = setup_lifecycle_manager().await?; + + let component_path = build_filesystem_component().await?; + let uri = format!("file://{}", component_path.to_str().unwrap()); + + // Load with stateful: true + let outcome = manager + .load_component_with_options(&uri, LoadOptions { stateful: true }) + .await?; + + // Make a call to create the stateful instance + let _ = manager + .execute_component_call( + &outcome.component_id, + "list-directory", + r#"{"path": "/tmp"}"#, + ) + .await; + + // Reload the component (should clear previous stateful state) + let reload_outcome = manager + .load_component_with_options(&uri, LoadOptions { stateful: true }) + .await?; + + assert_eq!(reload_outcome.component_id, outcome.component_id); + assert_eq!( + reload_outcome.status, + wassette::LoadResult::Replaced, + "Reload should replace existing component" + ); + + // Make another call - should work with fresh state + let _ = manager + .execute_component_call( + &outcome.component_id, + "list-directory", + r#"{"path": "/tmp"}"#, + ) + .await; + + Ok(()) +} + +#[cfg(any(target_os = "linux", target_os = "macos"))] +#[test(tokio::test)] +async fn test_stateless_component_default_behavior() -> Result<()> { + let (manager, _tempdir) = setup_lifecycle_manager().await?; + + let component_path = build_filesystem_component().await?; + let uri = format!("file://{}", component_path.to_str().unwrap()); + + // Load without stateful option (default stateless) + let outcome = manager.load_component(&uri).await?; + + // Make multiple calls - each should create fresh Store/Instance + for _ in 0..3 { + let _ = manager + .execute_component_call( + &outcome.component_id, + "list-directory", + r#"{"path": "/tmp"}"#, + ) + .await; + } + + // If we got here without panicking, stateless execution still works + Ok(()) +}