diff --git a/README.md b/README.md index b1ec481..df34d70 100644 --- a/README.md +++ b/README.md @@ -5,17 +5,25 @@ [![License](https://img.shields.io/badge/license-MIT-purple)](LICENSE) [![PyPI Version](https://img.shields.io/pypi/v/truenas-mcp-server)](https://pypi.org/project/truenas-mcp-server/) -A production-ready Model Context Protocol (MCP) server for TrueNAS Core systems. Control and manage your TrueNAS storage through natural language with Claude or other MCP-compatible clients. +A production-ready Model Context Protocol (MCP) server for **TrueNAS Core and SCALE** systems. Control and manage your TrueNAS storage and virtualization through natural language with Claude or other MCP-compatible clients. + +**Automatic variant detection**: The server automatically detects whether you're connected to TrueNAS Core or SCALE and enables the appropriate features. ## 🚀 Features -### Core Capabilities +### Universal Features (Core & SCALE) - **User Management** - Create, update, delete users and manage permissions -- **Storage Management** - Manage pools, datasets, volumes with full ZFS support +- **Storage Management** - Manage pools, datasets, volumes with full ZFS support - **File Sharing** - Configure SMB, NFS, and iSCSI shares - **Snapshot Management** - Create, delete, rollback snapshots with automation - **System Monitoring** - Check system health, pool status, and resource usage +### TrueNAS SCALE Features (24.04+) +*Automatically enabled when connected to SCALE* +- **Apps** - Manage Docker Compose-based TrueNAS applications +- **Incus Instances** - Control Incus VMs and containers (SCALE 25.04+) +- **Legacy VMs** - Manage bhyve virtual machines + ### Enterprise Features - **Type-Safe Operations** - Full Pydantic models for request/response validation - **Comprehensive Error Handling** - Detailed error messages and recovery guidance @@ -123,6 +131,13 @@ Once configured, you can interact with TrueNAS using natural language: "Set up an SMB share for the documents dataset" "Create a snapshot of all datasets in the tank pool" "Show me users who have sudo privileges" + +# TrueNAS SCALE virtualization examples +"List all running apps and their status" +"Get the configuration for the sonarr app" +"Show me all Incus VMs and containers" +"Update the crypto-nodes VM to use 8 CPUs" +"Restart the plex app" ``` ### As a Python Library @@ -199,6 +214,34 @@ asyncio.run(main()) - `clone_snapshot` - Clone to new dataset - `create_snapshot_task` - Setup automated snapshots +### App Management (TrueNAS SCALE) +- `list_apps` - Show all TrueNAS apps with status +- `get_app` - Get detailed app information +- `get_app_config` - Get full app configuration +- `start_app` - Start an app +- `stop_app` - Stop an app +- `restart_app` - Restart an app +- `redeploy_app` - Redeploy after config changes +- `update_app_config` - Update app configuration + +### Incus Instance Management (TrueNAS SCALE) +- `list_instances` - Show VMs and containers +- `get_instance` - Get instance details +- `start_instance` - Start an instance +- `stop_instance` - Stop an instance +- `restart_instance` - Restart an instance +- `update_instance` - Update CPU/memory/autostart +- `list_instance_devices` - Show attached devices + +### Legacy VM Management +- `list_legacy_vms` - Show bhyve VMs +- `get_legacy_vm` - Get VM details +- `start_legacy_vm` - Start a VM +- `stop_legacy_vm` - Stop a VM +- `restart_legacy_vm` - Restart a VM +- `update_legacy_vm` - Update VM configuration +- `get_legacy_vm_status` - Get VM status + ### Debug Tools (Development Mode) - `debug_connection` - Check connection settings - `test_connection` - Verify API connectivity @@ -221,14 +264,20 @@ truenas_mcp_server/ │ ├── base.py # Base models │ ├── user.py # User models │ ├── storage.py # Storage models -│ └── sharing.py # Share models +│ ├── sharing.py # Share models +│ ├── app.py # App models (SCALE) +│ ├── instance.py # Incus instance models (SCALE) +│ └── vm.py # Legacy VM models ├── tools/ # MCP tools │ ├── __init__.py │ ├── base.py # Base tool class │ ├── users.py # User tools │ ├── storage.py # Storage tools │ ├── sharing.py # Share tools -│ └── snapshots.py # Snapshot tools +│ ├── snapshots.py # Snapshot tools +│ ├── apps.py # App tools (SCALE) +│ ├── instances.py # Incus instance tools (SCALE) +│ └── vms.py # Legacy VM tools └── exceptions.py # Custom exceptions ``` diff --git a/tests/test_virtualization_live.py b/tests/test_virtualization_live.py new file mode 100644 index 0000000..4b57778 --- /dev/null +++ b/tests/test_virtualization_live.py @@ -0,0 +1,243 @@ +""" +Live integration tests for TrueNAS virtualization tools + +These tests run against actual TrueNAS servers and require: +- TRUENAS_URL and TRUENAS_API_KEY environment variables set +- Test resources created manually or by this test + +Test resources use the 'mcp-test-' prefix to avoid conflicts with production. +""" + +import asyncio +import os +import pytest +from typing import Optional + +# Skip all tests if not configured +pytestmark = pytest.mark.skipif( + not os.environ.get("TRUENAS_URL"), + reason="TRUENAS_URL not set - skipping live tests" +) + + +class TestAppToolsLive: + """Live tests for AppTools""" + + @pytest.fixture + def app_tools(self): + from truenas_mcp_server.tools.apps import AppTools + return AppTools() + + @pytest.mark.asyncio + async def test_list_apps(self, app_tools): + """Test listing all apps (read-only, safe)""" + result = await app_tools.list_apps() + assert result["success"] is True + assert "apps" in result + assert "metadata" in result + print(f"\nFound {result['metadata']['total_apps']} apps") + for app in result["apps"]: + print(f" - {app['name']}: {app['state']}") + + @pytest.mark.asyncio + async def test_get_app_existing(self, app_tools): + """Test getting an existing app""" + # First list apps to find one + list_result = await app_tools.list_apps() + if not list_result["apps"]: + pytest.skip("No apps found to test") + + app_name = list_result["apps"][0]["name"] + result = await app_tools.get_app(app_name) + assert result["success"] is True + assert result["app"]["name"] == app_name + + @pytest.mark.asyncio + async def test_get_app_config_existing(self, app_tools): + """Test getting app config (uses quirky plain string body!)""" + list_result = await app_tools.list_apps() + if not list_result["apps"]: + pytest.skip("No apps found to test") + + app_name = list_result["apps"][0]["name"] + result = await app_tools.get_app_config(app_name) + assert result["success"] is True + assert result["app_name"] == app_name + assert "config" in result + print(f"\nApp {app_name} config keys: {list(result['config'].keys()) if result['config'] else 'empty'}") + + @pytest.mark.asyncio + async def test_get_app_nonexistent(self, app_tools): + """Test getting a non-existent app""" + result = await app_tools.get_app("nonexistent-app-xyz123") + assert result["success"] is False + assert "not found" in result["error"].lower() + + +class TestInstanceToolsLive: + """Live tests for InstanceTools""" + + @pytest.fixture + def instance_tools(self): + from truenas_mcp_server.tools.instances import InstanceTools + return InstanceTools() + + @pytest.mark.asyncio + async def test_list_instances(self, instance_tools): + """Test listing all Incus instances (read-only, safe)""" + result = await instance_tools.list_instances() + assert result["success"] is True + assert "instances" in result + assert "metadata" in result + print(f"\nFound {result['metadata']['total_instances']} instances") + for inst in result["instances"]: + print(f" - {inst['name']} ({inst['type']}): {inst['status']} - {inst['cpu']} CPU, {inst['memory_gb']}GB RAM") + + @pytest.mark.asyncio + async def test_list_instances_filter_vm(self, instance_tools): + """Test filtering instances by type""" + result = await instance_tools.list_instances(instance_type="VM") + assert result["success"] is True + for inst in result["instances"]: + assert inst["type"] == "VM" + + @pytest.mark.asyncio + async def test_list_instances_filter_container(self, instance_tools): + """Test filtering instances by type""" + result = await instance_tools.list_instances(instance_type="CONTAINER") + assert result["success"] is True + for inst in result["instances"]: + assert inst["type"] == "CONTAINER" + + @pytest.mark.asyncio + async def test_get_instance_existing(self, instance_tools): + """Test getting an existing instance""" + list_result = await instance_tools.list_instances() + if not list_result["instances"]: + pytest.skip("No instances found to test") + + inst_name = list_result["instances"][0]["name"] + result = await instance_tools.get_instance(inst_name) + assert result["success"] is True + assert result["instance"]["name"] == inst_name + + @pytest.mark.asyncio + async def test_get_instance_nonexistent(self, instance_tools): + """Test getting a non-existent instance""" + result = await instance_tools.get_instance("nonexistent-instance-xyz123") + assert result["success"] is False + assert "not found" in result["error"].lower() + + @pytest.mark.asyncio + async def test_list_instance_devices(self, instance_tools): + """Test listing devices for an instance""" + list_result = await instance_tools.list_instances() + if not list_result["instances"]: + pytest.skip("No instances found to test") + + inst_name = list_result["instances"][0]["name"] + result = await instance_tools.list_instance_devices(inst_name) + assert result["success"] is True + assert "devices" in result + print(f"\nInstance {inst_name} has {result['metadata']['device_count']} devices") + + +class TestLegacyVMToolsLive: + """Live tests for LegacyVMTools""" + + @pytest.fixture + def vm_tools(self): + from truenas_mcp_server.tools.vms import LegacyVMTools + return LegacyVMTools() + + @pytest.mark.asyncio + async def test_list_legacy_vms(self, vm_tools): + """Test listing all legacy VMs (read-only, safe)""" + result = await vm_tools.list_legacy_vms() + assert result["success"] is True + assert "vms" in result + assert "metadata" in result + print(f"\nFound {result['metadata']['total_vms']} legacy VMs") + for vm in result["vms"]: + print(f" - {vm['name']} (ID: {vm['id']}): {vm['status']} - {vm['vcpus']} vCPU, {vm['memory_mb']}MB RAM") + + @pytest.mark.asyncio + async def test_get_legacy_vm_existing(self, vm_tools): + """Test getting an existing legacy VM""" + list_result = await vm_tools.list_legacy_vms() + if not list_result["vms"]: + pytest.skip("No legacy VMs found to test") + + vm_id = list_result["vms"][0]["id"] + result = await vm_tools.get_legacy_vm(vm_id) + assert result["success"] is True + assert result["vm"]["id"] == vm_id + + @pytest.mark.asyncio + async def test_get_legacy_vm_nonexistent(self, vm_tools): + """Test getting a non-existent legacy VM""" + result = await vm_tools.get_legacy_vm(99999) + assert result["success"] is False + assert "not found" in result["error"].lower() + + +class TestIntegration: + """Integration tests that exercise multiple tools together""" + + @pytest.mark.asyncio + async def test_full_workflow(self): + """ + Test a full workflow: + 1. List resources + 2. Get details of each type + 3. Verify API quirks are handled correctly + """ + from truenas_mcp_server.tools.apps import AppTools + from truenas_mcp_server.tools.instances import InstanceTools + from truenas_mcp_server.tools.vms import LegacyVMTools + + app_tools = AppTools() + instance_tools = InstanceTools() + vm_tools = LegacyVMTools() + + # List all resource types + apps = await app_tools.list_apps() + instances = await instance_tools.list_instances() + vms = await vm_tools.list_legacy_vms() + + print("\n=== TrueNAS Virtualization Summary ===") + print(f"Apps: {apps['metadata']['total_apps']}") + print(f"Incus Instances: {instances['metadata']['total_instances']}") + print(f"Legacy VMs: {vms['metadata']['total_vms']}") + + # Verify all succeeded + assert apps["success"] + assert instances["success"] + assert vms["success"] + + +if __name__ == "__main__": + # Run quick smoke test + async def smoke_test(): + from truenas_mcp_server.tools.apps import AppTools + from truenas_mcp_server.tools.instances import InstanceTools + from truenas_mcp_server.tools.vms import LegacyVMTools + + print("Testing AppTools...") + app_tools = AppTools() + apps = await app_tools.list_apps() + print(f" Found {apps['metadata']['total_apps']} apps") + + print("Testing InstanceTools...") + instance_tools = InstanceTools() + instances = await instance_tools.list_instances() + print(f" Found {instances['metadata']['total_instances']} instances") + + print("Testing LegacyVMTools...") + vm_tools = LegacyVMTools() + vms = await vm_tools.list_legacy_vms() + print(f" Found {vms['metadata']['total_vms']} legacy VMs") + + print("\nAll smoke tests passed!") + + asyncio.run(smoke_test()) diff --git a/truenas_mcp_server/client/__init__.py b/truenas_mcp_server/client/__init__.py index 74c10fc..232b4e9 100644 --- a/truenas_mcp_server/client/__init__.py +++ b/truenas_mcp_server/client/__init__.py @@ -2,6 +2,6 @@ HTTP Client management for TrueNAS API interactions """ -from .http_client import TrueNASClient, get_client, close_client +from .http_client import TrueNASClient, TrueNASVariant, get_client, close_client -__all__ = ["TrueNASClient", "get_client", "close_client"] \ No newline at end of file +__all__ = ["TrueNASClient", "TrueNASVariant", "get_client", "close_client"] \ No newline at end of file diff --git a/truenas_mcp_server/client/http_client.py b/truenas_mcp_server/client/http_client.py index 551411c..b167f33 100644 --- a/truenas_mcp_server/client/http_client.py +++ b/truenas_mcp_server/client/http_client.py @@ -6,6 +6,7 @@ import logging from typing import Optional, Dict, Any, Union from functools import wraps +from enum import Enum import httpx from httpx import Response, HTTPError, TimeoutException, ConnectError @@ -19,6 +20,13 @@ TrueNASRateLimitError ) + +class TrueNASVariant(str, Enum): + """TrueNAS product variant""" + CORE = "core" # TrueNAS Core (FreeBSD-based) + SCALE = "scale" # TrueNAS SCALE (Linux-based) + UNKNOWN = "unknown" + logger = logging.getLogger(__name__) @@ -83,7 +91,7 @@ class TrueNASClient: def __init__(self, settings=None): """ Initialize the TrueNAS client - + Args: settings: Optional Settings instance (uses get_settings() if not provided) """ @@ -91,6 +99,9 @@ def __init__(self, settings=None): self._client = None self._request_count = 0 self._error_count = 0 + self._variant: TrueNASVariant = TrueNASVariant.UNKNOWN + self._version: Optional[str] = None + self._system_info: Optional[Dict[str, Any]] = None async def __aenter__(self): """Async context manager entry""" @@ -246,6 +257,50 @@ async def put(self, endpoint: str, data: Optional[Dict[str, Any]] = None) -> Dic return response.json() + @retry_on_failure() + async def post_raw( + self, + endpoint: str, + data: str, + content_type: str = "application/json" + ) -> Dict[str, Any]: + """ + Send POST request with raw string body to TrueNAS API + + Some TrueNAS API endpoints expect a plain string body rather than + a JSON object. For example: + - POST /app/config expects: "app_name" (plain quoted string) + - POST /virt/instance/start expects: "instance_name" (plain quoted string) + + Args: + endpoint: API endpoint (relative to base URL) + data: Raw string body (should include quotes if needed) + content_type: Content-Type header (default: application/json) + + Returns: + Response data as dictionary + """ + await self.ensure_connected() + + self._log_request("POST", endpoint) + logger.debug(f"Raw body: {data}") + + response = await self._client.post( + endpoint, + content=data.encode(), + headers={"Content-Type": content_type} + ) + self._log_response(response) + + if response.status_code >= 400: + self._handle_error_response(response) + + # Handle empty responses + if not response.content: + return {} + + return response.json() + @retry_on_failure() async def delete(self, endpoint: str) -> bool: """ @@ -276,6 +331,77 @@ def get_stats(self) -> Dict[str, int]: "error_rate": self._error_count / max(self._request_count, 1) } + @property + def variant(self) -> TrueNASVariant: + """Get the detected TrueNAS variant""" + return self._variant + + @property + def version(self) -> Optional[str]: + """Get the TrueNAS version string""" + return self._version + + @property + def is_scale(self) -> bool: + """Check if connected to TrueNAS SCALE""" + return self._variant == TrueNASVariant.SCALE + + @property + def is_core(self) -> bool: + """Check if connected to TrueNAS Core""" + return self._variant == TrueNASVariant.CORE + + async def detect_variant(self) -> TrueNASVariant: + """ + Detect the TrueNAS variant (Core vs SCALE) by querying system info + + TrueNAS SCALE returns system info with 'version' containing 'SCALE' + TrueNAS Core returns system info with 'version' containing 'CORE' + + Returns: + TrueNASVariant enum value + """ + await self.ensure_connected() + + try: + # Query system info + system_info = await self.get("/system/info") + self._system_info = system_info + + # Extract version info + version = system_info.get("version", "") + self._version = version + + # Detect variant from version string + version_upper = version.upper() + if "SCALE" in version_upper: + self._variant = TrueNASVariant.SCALE + logger.info(f"Detected TrueNAS SCALE: {version}") + elif "CORE" in version_upper: + self._variant = TrueNASVariant.CORE + logger.info(f"Detected TrueNAS Core: {version}") + else: + # Additional detection: SCALE is Linux-based + # Try querying an endpoint that only exists on SCALE + try: + await self.get("/app") + self._variant = TrueNASVariant.SCALE + logger.info(f"Detected TrueNAS SCALE (via /app endpoint): {version}") + except TrueNASAPIError: + # /app doesn't exist - likely Core + self._variant = TrueNASVariant.CORE + logger.info(f"Detected TrueNAS Core (via /app 404): {version}") + + except Exception as e: + logger.warning(f"Failed to detect TrueNAS variant: {e}") + self._variant = TrueNASVariant.UNKNOWN + + return self._variant + + def get_system_info(self) -> Optional[Dict[str, Any]]: + """Get cached system info (call detect_variant first)""" + return self._system_info + # Global client instance _client: Optional[TrueNASClient] = None diff --git a/truenas_mcp_server/models/__init__.py b/truenas_mcp_server/models/__init__.py index 01430d0..3c25614 100644 --- a/truenas_mcp_server/models/__init__.py +++ b/truenas_mcp_server/models/__init__.py @@ -6,6 +6,16 @@ from .user import User, UserCreate, UserUpdate from .storage import Pool, Dataset, DatasetCreate, Snapshot from .sharing import SMBShare, NFSExport, ISCSITarget +from .app import App, AppConfig, AppState, AppSummary +from .instance import ( + IncusInstance, + InstanceType, + InstanceStatus, + InstanceSummary, + InstanceDevice, + InstanceUpdateRequest +) +from .vm import LegacyVM, VMStatus, LegacyVMSummary, LegacyVMUpdateRequest __all__ = [ "BaseModel", @@ -20,4 +30,21 @@ "SMBShare", "NFSExport", "ISCSITarget", + # App models + "App", + "AppConfig", + "AppState", + "AppSummary", + # Instance models (Incus) + "IncusInstance", + "InstanceType", + "InstanceStatus", + "InstanceSummary", + "InstanceDevice", + "InstanceUpdateRequest", + # Legacy VM models + "LegacyVM", + "VMStatus", + "LegacyVMSummary", + "LegacyVMUpdateRequest", ] \ No newline at end of file diff --git a/truenas_mcp_server/models/app.py b/truenas_mcp_server/models/app.py new file mode 100644 index 0000000..fcae413 --- /dev/null +++ b/truenas_mcp_server/models/app.py @@ -0,0 +1,113 @@ +""" +TrueNAS App models for Docker Compose-based applications + +TrueNAS SCALE 25.04+ uses the /api/v2.0/app endpoints for app management. +""" + +from enum import Enum +from typing import Any, Dict, List, Optional +from pydantic import Field + +from .base import BaseModel + + +class AppState(str, Enum): + """Application deployment states""" + RUNNING = "RUNNING" + STOPPED = "STOPPED" + DEPLOYING = "DEPLOYING" + CRASHED = "CRASHED" + STOPPING = "STOPPING" + STARTING = "STARTING" + + +class AppResourceLimits(BaseModel): + """Resource limits for an app""" + cpus: int = Field(default=2, description="Number of CPU cores") + memory: int = Field(default=4096, description="Memory limit in MB") + + +class AppResources(BaseModel): + """Resource configuration for an app""" + limits: AppResourceLimits = Field( + default_factory=AppResourceLimits, + description="Resource limits" + ) + + +class AppStorageMount(BaseModel): + """Storage mount configuration for an app""" + mount_path: str = Field(..., description="Path inside container") + path: str = Field(..., description="Host path") + read_only: bool = Field(default=False, description="Whether mount is read-only") + + +class AppStorageConfig(BaseModel): + """Storage configuration for an app""" + config: Optional[Dict[str, Any]] = Field( + None, + description="Config storage settings" + ) + additional: List[AppStorageMount] = Field( + default_factory=list, + description="Additional storage mounts" + ) + + +class AppNetworkConfig(BaseModel): + """Network configuration for an app""" + host_network: bool = Field(default=False, description="Use host network") + port: Optional[int] = Field(None, description="Primary port mapping") + + +class AppRunAsConfig(BaseModel): + """User/group configuration for an app""" + user: int = Field(..., description="UID to run as") + group: int = Field(..., description="GID to run as") + + +class App(BaseModel): + """ + TrueNAS App model + + Represents a Docker Compose-based application managed by TrueNAS. + """ + id: str = Field(..., description="App identifier (same as name)") + name: str = Field(..., description="App name") + state: AppState = Field(..., description="Current deployment state") + version: Optional[str] = Field(None, description="App version") + human_version: Optional[str] = Field(None, description="Human-readable version") + upgrade_available: bool = Field(default=False, description="Whether upgrade is available") + portal: Optional[Dict[str, Any]] = Field(None, description="Web portal information") + metadata: Optional[Dict[str, Any]] = Field(None, description="App metadata") + + +class AppConfig(BaseModel): + """ + Full app configuration as returned by /app/config endpoint + + This is the complete configuration structure that can be retrieved + and modified via the TrueNAS API. + """ + resources: Optional[AppResources] = Field(None, description="Resource configuration") + storage: Optional[Dict[str, Any]] = Field(None, description="Storage configuration") + network: Optional[Dict[str, Any]] = Field(None, description="Network configuration") + run_as: Optional[Dict[str, Any]] = Field(None, description="Run as configuration") + timezone: Optional[str] = Field(None, description="App timezone") + + # Additional fields vary by app type - stored as extra + extra: Dict[str, Any] = Field( + default_factory=dict, + description="Additional app-specific configuration" + ) + + +class AppSummary(BaseModel): + """ + Summary information about an app for list operations + """ + name: str = Field(..., description="App name") + state: str = Field(..., description="Current state") + version: Optional[str] = Field(None, description="App version") + upgrade_available: bool = Field(default=False, description="Upgrade available") + portal_url: Optional[str] = Field(None, description="Web portal URL if available") diff --git a/truenas_mcp_server/models/instance.py b/truenas_mcp_server/models/instance.py new file mode 100644 index 0000000..239ae69 --- /dev/null +++ b/truenas_mcp_server/models/instance.py @@ -0,0 +1,125 @@ +""" +TrueNAS Incus Instance models for VMs and Containers + +TrueNAS SCALE 25.04+ uses Incus for virtualization via /api/v2.0/virt/instance. +This replaces the older bhyve-based /api/v2.0/vm API for new deployments. +""" + +from enum import Enum +from typing import Any, Dict, List, Optional +from pydantic import Field + +from .base import BaseModel + + +class InstanceType(str, Enum): + """Incus instance types""" + VM = "VM" + CONTAINER = "CONTAINER" + + +class InstanceStatus(str, Enum): + """Instance runtime states""" + RUNNING = "RUNNING" + STOPPED = "STOPPED" + STARTING = "STARTING" + STOPPING = "STOPPING" + ERROR = "ERROR" + + +class DeviceType(str, Enum): + """Types of devices that can be attached to instances""" + DISK = "DISK" + NIC = "NIC" + GPU = "GPU" + USB = "USB" + TPM = "TPM" + PROXY = "PROXY" + + +class InstanceDevice(BaseModel): + """ + Device attached to an Incus instance + """ + name: str = Field(..., description="Device name") + type: DeviceType = Field(..., description="Device type") + source: Optional[str] = Field(None, description="Source path or identifier") + destination: Optional[str] = Field(None, description="Destination path in instance") + readonly: bool = Field(default=False, description="Read-only mount") + extra: Dict[str, Any] = Field( + default_factory=dict, + description="Additional device-specific configuration" + ) + + +class InstanceNIC(BaseModel): + """ + Network interface configuration for an instance + """ + name: str = Field(..., description="Interface name") + type: str = Field(default="bridged", description="NIC type") + parent: Optional[str] = Field(None, description="Parent bridge/network") + hwaddr: Optional[str] = Field(None, description="MAC address") + + +class IncusInstance(BaseModel): + """ + Incus Instance model (VM or Container) + + Represents a virtualization instance managed by Incus on TrueNAS. + """ + id: str = Field(..., description="Instance identifier (same as name)") + name: str = Field(..., description="Instance name") + type: InstanceType = Field(..., description="Instance type (VM or CONTAINER)") + status: InstanceStatus = Field(..., description="Current runtime status") + cpu: str = Field(default="2", description="Number of CPU cores as string") + memory: int = Field( + default=4294967296, + description="Memory in bytes (default 4GB)" + ) + autostart: bool = Field(default=True, description="Start on boot") + image: Optional[str] = Field(None, description="Base image used") + environment: Dict[str, str] = Field( + default_factory=dict, + description="Environment variables" + ) + devices: List[InstanceDevice] = Field( + default_factory=list, + description="Attached devices" + ) + nics: List[InstanceNIC] = Field( + default_factory=list, + description="Network interfaces" + ) + raw: Optional[Dict[str, Any]] = Field( + None, + description="Raw API response for additional fields" + ) + + +class InstanceSummary(BaseModel): + """ + Summary information about an instance for list operations + """ + id: str = Field(..., description="Instance ID") + name: str = Field(..., description="Instance name") + type: str = Field(..., description="Instance type (VM/CONTAINER)") + status: str = Field(..., description="Current status") + cpu: str = Field(..., description="CPU allocation") + memory_gb: float = Field(..., description="Memory in GB") + autostart: bool = Field(..., description="Autostart enabled") + + +class InstanceUpdateRequest(BaseModel): + """ + Request model for updating instance configuration + + Only the fields provided will be updated. + """ + cpu: Optional[str] = Field(None, description="Number of CPU cores") + memory: Optional[int] = Field(None, description="Memory in bytes") + autostart: Optional[bool] = Field(None, description="Start on boot") + environment: Optional[Dict[str, str]] = Field( + None, + description="Environment variables" + ) diff --git a/truenas_mcp_server/models/vm.py b/truenas_mcp_server/models/vm.py new file mode 100644 index 0000000..0cc22e8 --- /dev/null +++ b/truenas_mcp_server/models/vm.py @@ -0,0 +1,130 @@ +""" +TrueNAS Legacy VM models for bhyve-based virtual machines + +TrueNAS SCALE uses bhyve for legacy VMs via /api/v2.0/vm. +New deployments should use Incus VMs (/api/v2.0/virt/instance) instead. +""" + +from enum import Enum +from typing import Any, Dict, List, Optional +from pydantic import Field + +from .base import BaseModel + + +class VMStatus(str, Enum): + """Legacy VM runtime states""" + RUNNING = "RUNNING" + STOPPED = "STOPPED" + SUSPENDED = "SUSPENDED" + + +class VMBootloader(str, Enum): + """VM bootloader types""" + UEFI = "UEFI" + UEFI_CSM = "UEFI_CSM" + + +class VMDeviceType(str, Enum): + """Types of devices in legacy VMs""" + DISK = "DISK" + CDROM = "CDROM" + NIC = "NIC" + RAW = "RAW" + DISPLAY = "DISPLAY" + USB = "USB" + + +class VMDevice(BaseModel): + """ + Device attached to a legacy VM + """ + id: int = Field(..., description="Device ID") + dtype: VMDeviceType = Field(..., description="Device type") + order: int = Field(default=1000, description="Device order") + vm: int = Field(..., description="Parent VM ID") + attributes: Dict[str, Any] = Field( + default_factory=dict, + description="Device-specific attributes" + ) + + +class LegacyVM(BaseModel): + """ + Legacy bhyve VM model + + Represents a virtual machine managed by bhyve on TrueNAS. + Note: New deployments should use Incus VMs instead. + """ + id: int = Field(..., description="VM numeric ID") + name: str = Field(..., description="VM name") + description: Optional[str] = Field(None, description="VM description") + vcpus: int = Field(default=1, description="Number of virtual CPUs") + memory: int = Field( + default=512, + description="Memory in MB" + ) + min_memory: Optional[int] = Field(None, description="Minimum memory for ballooning") + autostart: bool = Field(default=False, description="Start on boot") + time: str = Field(default="LOCAL", description="Time sync (LOCAL/UTC)") + bootloader: VMBootloader = Field( + default=VMBootloader.UEFI, + description="Bootloader type" + ) + bootloader_ovmf: Optional[str] = Field(None, description="Custom OVMF path") + shutdown_timeout: int = Field( + default=90, + description="Shutdown timeout in seconds" + ) + cpu_mode: str = Field(default="CUSTOM", description="CPU emulation mode") + cpu_model: Optional[str] = Field(None, description="CPU model to emulate") + hide_from_msr: bool = Field( + default=False, + description="Hide hypervisor from guest" + ) + hyperv_enlightenments: bool = Field( + default=False, + description="Enable Hyper-V enlightenments" + ) + ensure_display_device: bool = Field( + default=True, + description="Ensure display device exists" + ) + arch_type: Optional[str] = Field(None, description="Architecture type") + machine_type: Optional[str] = Field(None, description="Machine type") + uuid: Optional[str] = Field(None, description="VM UUID") + status: Optional[VMStatus] = Field(None, description="Current status") + devices: List[VMDevice] = Field( + default_factory=list, + description="Attached devices" + ) + + +class LegacyVMSummary(BaseModel): + """ + Summary information about a legacy VM for list operations + """ + id: int = Field(..., description="VM ID") + name: str = Field(..., description="VM name") + status: str = Field(..., description="Current status") + vcpus: int = Field(..., description="Number of vCPUs") + memory_mb: int = Field(..., description="Memory in MB") + autostart: bool = Field(..., description="Autostart enabled") + description: Optional[str] = Field(None, description="VM description") + + +class LegacyVMUpdateRequest(BaseModel): + """ + Request model for updating legacy VM configuration + + Only the fields provided will be updated. + """ + name: Optional[str] = Field(None, description="VM name") + description: Optional[str] = Field(None, description="VM description") + vcpus: Optional[int] = Field(None, description="Number of vCPUs") + memory: Optional[int] = Field(None, description="Memory in MB") + autostart: Optional[bool] = Field(None, description="Start on boot") + shutdown_timeout: Optional[int] = Field( + None, + description="Shutdown timeout in seconds" + ) diff --git a/truenas_mcp_server/server.py b/truenas_mcp_server/server.py index af08c54..b78178a 100644 --- a/truenas_mcp_server/server.py +++ b/truenas_mcp_server/server.py @@ -1,21 +1,29 @@ """ Main TrueNAS MCP Server implementation + +Supports both TrueNAS Core and TrueNAS SCALE with automatic variant detection. +SCALE-specific features (Apps, Incus, VMs) are only registered when connected +to a SCALE system. """ import sys import logging import asyncio -from typing import Optional +from typing import Optional, List, Type from mcp.server.fastmcp import FastMCP from .config import get_settings -from .client import get_client, close_client +from .client import get_client, close_client, TrueNASVariant from .tools import ( + BaseTool, DebugTools, UserTools, StorageTools, SharingTools, - SnapshotTools + SnapshotTools, + AppTools, + InstanceTools, + LegacyVMTools, ) # Configure logging @@ -29,23 +37,41 @@ class TrueNASMCPServer: """ Main TrueNAS MCP Server class - - Manages the MCP server instance and registers all available tools + + Manages the MCP server instance and registers all available tools. + Automatically detects TrueNAS variant (Core vs SCALE) and registers + appropriate tools. """ - + + # Tools that work on both TrueNAS Core and SCALE + UNIVERSAL_TOOLS: List[Type[BaseTool]] = [ + UserTools, + StorageTools, + SharingTools, + SnapshotTools, + ] + + # Tools that only work on TrueNAS SCALE + SCALE_ONLY_TOOLS: List[Type[BaseTool]] = [ + AppTools, # Docker Compose apps (SCALE 24.04+) + InstanceTools, # Incus VMs/containers (SCALE 25.04+) + LegacyVMTools, # bhyve VMs (SCALE, differs from Core) + ] + def __init__(self, name: str = "TrueNAS MCP Server"): """ Initialize the TrueNAS MCP Server - + Args: name: Server name for MCP """ self.name = name self.settings = get_settings() self.mcp = FastMCP(name) - self.tools = [] + self.tools: List[BaseTool] = [] + self.variant: TrueNASVariant = TrueNASVariant.UNKNOWN self._setup_logging() - self._register_tools() + # Note: Tools are registered during initialize() after variant detection def _setup_logging(self): """Configure logging based on settings""" @@ -57,28 +83,42 @@ def _setup_logging(self): logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING) - def _register_tools(self): - """Register all available tools with the MCP server""" - logger.info("Registering MCP tools...") - - # Initialize tool instances - tool_classes = [ - UserTools, - StorageTools, - SharingTools, - SnapshotTools - ] - + def _register_tools(self, variant: TrueNASVariant): + """ + Register tools with the MCP server based on TrueNAS variant + + Args: + variant: Detected TrueNAS variant (Core, SCALE, or Unknown) + """ + logger.info(f"Registering MCP tools for TrueNAS {variant.value}...") + + # Start with universal tools that work on both Core and SCALE + tool_classes: List[Type[BaseTool]] = list(self.UNIVERSAL_TOOLS) + + # Add SCALE-only tools if connected to SCALE + if variant == TrueNASVariant.SCALE: + tool_classes.extend(self.SCALE_ONLY_TOOLS) + logger.info("Including SCALE-specific tools (Apps, Instances, VMs)") + elif variant == TrueNASVariant.CORE: + logger.info("Skipping SCALE-only tools (Apps, Instances, VMs) - Core detected") + else: + # Unknown variant - include all tools but warn + tool_classes.extend(self.SCALE_ONLY_TOOLS) + logger.warning( + "TrueNAS variant unknown - registering all tools. " + "SCALE-only tools may fail on TrueNAS Core." + ) + # Add debug tools if enabled if self.settings.enable_debug_tools or self.settings.is_development(): tool_classes.insert(0, DebugTools) - + # Register each tool class for tool_class in tool_classes: tool_instance = tool_class() self.tools.append(tool_instance) self._register_tool_methods(tool_instance) - + logger.info(f"Registered {len(self.tools)} tool categories") def _register_tool_methods(self, tool_instance): @@ -92,21 +132,31 @@ def _register_tool_methods(self, tool_instance): logger.debug(f"Registered tool: {method_name}") async def initialize(self): - """Initialize the server and all tools""" + """Initialize the server, detect variant, and register tools""" logger.info(f"Initializing {self.name}...") - + try: # Initialize the client client = await get_client() - + + # Detect TrueNAS variant (Core vs SCALE) + self.variant = await client.detect_variant() + logger.info( + f"Connected to TrueNAS {self.variant.value.upper()} " + f"version {client.version or 'unknown'}" + ) + + # Register tools based on detected variant + self._register_tools(self.variant) + # Initialize all tools for tool in self.tools: tool.client = client tool.settings = self.settings await tool.initialize() - + logger.info(f"{self.name} initialized successfully") - + except Exception as e: logger.error(f"Failed to initialize server: {e}") raise diff --git a/truenas_mcp_server/tools/__init__.py b/truenas_mcp_server/tools/__init__.py index 52538a8..1be5db5 100644 --- a/truenas_mcp_server/tools/__init__.py +++ b/truenas_mcp_server/tools/__init__.py @@ -8,6 +8,9 @@ from .storage import StorageTools from .sharing import SharingTools from .snapshots import SnapshotTools +from .apps import AppTools +from .instances import InstanceTools +from .vms import LegacyVMTools __all__ = [ "BaseTool", @@ -17,4 +20,8 @@ "StorageTools", "SharingTools", "SnapshotTools", + # Virtualization tools + "AppTools", + "InstanceTools", + "LegacyVMTools", ] \ No newline at end of file diff --git a/truenas_mcp_server/tools/apps.py b/truenas_mcp_server/tools/apps.py new file mode 100644 index 0000000..e6fcdd9 --- /dev/null +++ b/truenas_mcp_server/tools/apps.py @@ -0,0 +1,410 @@ +""" +TrueNAS App management tools + +Provides tools for managing Docker Compose-based applications on TrueNAS SCALE. +Uses the /api/v2.0/app endpoints. + +API Quirks: +- POST /app/config expects a plain string body: "app_name" (not an object!) +- POST /app/start expects: {"app_name": "name"} (an object) +- POST /app/stop expects: {"app_name": "name"} (an object) +- App operations are async and return job IDs +""" + +import asyncio +from typing import Any, Dict, List, Optional + +from .base import BaseTool, tool_handler + + +class AppTools(BaseTool): + """Tools for managing TrueNAS apps (Docker Compose applications)""" + + # Timeout for app operations (start/stop can take time) + APP_OPERATION_TIMEOUT = 120 # seconds + POLL_INTERVAL = 5 # seconds + + def get_tool_definitions(self) -> list: + """Get tool definitions for app management""" + return [ + ("list_apps", self.list_apps, "List all TrueNAS apps with status", {}), + ("get_app", self.get_app, "Get detailed information about a specific app", + {"app_name": {"type": "string", "required": True, + "description": "Name of the app"}}), + ("get_app_config", self.get_app_config, + "Get the full configuration of an app", + {"app_name": {"type": "string", "required": True, + "description": "Name of the app"}}), + ("start_app", self.start_app, "Start an app", + {"app_name": {"type": "string", "required": True, + "description": "Name of the app to start"}}), + ("stop_app", self.stop_app, "Stop an app", + {"app_name": {"type": "string", "required": True, + "description": "Name of the app to stop"}, + "force": {"type": "boolean", "required": False, + "description": "Force stop without waiting (default: false)"}}), + ("restart_app", self.restart_app, "Restart an app (stop then start)", + {"app_name": {"type": "string", "required": True, + "description": "Name of the app to restart"}}), + ("redeploy_app", self.redeploy_app, + "Redeploy an app after configuration changes", + {"app_name": {"type": "string", "required": True, + "description": "Name of the app to redeploy"}}), + ("update_app_config", self.update_app_config, + "Update app configuration (resource limits only in hostvars mode)", + {"app_name": {"type": "string", "required": True, + "description": "Name of the app to update"}, + "values": {"type": "object", "required": True, + "description": "Configuration values to update"}}), + ] + + @tool_handler + async def list_apps(self) -> Dict[str, Any]: + """ + List all TrueNAS apps with their status + + Returns: + Dictionary containing list of apps with status and metadata + """ + await self.ensure_initialized() + + apps = await self.client.get("/app") + + app_list = [] + for app in apps: + # Extract portal URL if available + portal_url = None + portal = app.get("portal", {}) + if portal and isinstance(portal, dict): + # Get first portal entry + for portal_info in portal.values(): + if isinstance(portal_info, str): + portal_url = portal_info + break + elif isinstance(portal_info, dict): + portal_url = portal_info.get("url") + break + + app_info = { + "name": app.get("id") or app.get("name"), + "state": app.get("state", "UNKNOWN"), + "version": app.get("version"), + "human_version": app.get("human_version"), + "upgrade_available": app.get("upgrade_available", False), + "portal_url": portal_url, + } + app_list.append(app_info) + + # Count by state + state_counts = {} + for app in app_list: + state = app["state"] + state_counts[state] = state_counts.get(state, 0) + 1 + + return { + "success": True, + "apps": app_list, + "metadata": { + "total_apps": len(app_list), + "state_counts": state_counts, + } + } + + @tool_handler + async def get_app(self, app_name: str) -> Dict[str, Any]: + """ + Get detailed information about a specific app + + Args: + app_name: Name of the app + + Returns: + Dictionary containing app details + """ + await self.ensure_initialized() + + try: + app = await self.client.get(f"/app/id/{app_name}") + except Exception: + # Try getting all apps and finding by name + apps = await self.client.get("/app") + app = None + for a in apps: + if a.get("id") == app_name or a.get("name") == app_name: + app = a + break + + if not app: + return { + "success": False, + "error": f"App '{app_name}' not found" + } + + return { + "success": True, + "app": { + "name": app.get("id") or app.get("name"), + "state": app.get("state"), + "version": app.get("version"), + "human_version": app.get("human_version"), + "upgrade_available": app.get("upgrade_available", False), + "portal": app.get("portal"), + "metadata": app.get("metadata", {}), + }, + "raw": app # Include full response for debugging + } + + @tool_handler + async def get_app_config(self, app_name: str) -> Dict[str, Any]: + """ + Get the full configuration of an app + + NOTE: The /app/config endpoint expects a plain string body! + + Args: + app_name: Name of the app + + Returns: + Dictionary containing the full app configuration + """ + await self.ensure_initialized() + + # NOTE: This endpoint expects a plain quoted string, not an object! + config = await self.client.post_raw("/app/config", f'"{app_name}"') + + return { + "success": True, + "app_name": app_name, + "config": config + } + + @tool_handler + async def start_app(self, app_name: str) -> Dict[str, Any]: + """ + Start an app + + Args: + app_name: Name of the app to start + + Returns: + Dictionary containing operation result + """ + await self.ensure_initialized() + + # Check current state first + try: + app = await self.client.get(f"/app/id/{app_name}") + current_state = app.get("state", "UNKNOWN") + + if current_state == "RUNNING": + return { + "success": True, + "message": f"App '{app_name}' is already running", + "state": current_state + } + except Exception: + pass # Continue with start attempt + + # Start the app + result = await self.client.post("/app/start", {"app_name": app_name}) + + # Poll for completion + final_state = await self._wait_for_app_state(app_name, "RUNNING") + + return { + "success": final_state == "RUNNING", + "app_name": app_name, + "state": final_state, + "message": f"App '{app_name}' started successfully" if final_state == "RUNNING" + else f"App '{app_name}' may still be starting (current state: {final_state})" + } + + @tool_handler + async def stop_app( + self, + app_name: str, + force: bool = False + ) -> Dict[str, Any]: + """ + Stop an app + + Args: + app_name: Name of the app to stop + force: Force stop without waiting + + Returns: + Dictionary containing operation result + """ + await self.ensure_initialized() + + # Check current state first + try: + app = await self.client.get(f"/app/id/{app_name}") + current_state = app.get("state", "UNKNOWN") + + if current_state == "STOPPED": + return { + "success": True, + "message": f"App '{app_name}' is already stopped", + "state": current_state + } + except Exception: + pass + + # Stop the app + body = {"app_name": app_name} + if force: + body["force"] = True + + result = await self.client.post("/app/stop", body) + + if force: + return { + "success": True, + "app_name": app_name, + "message": f"Force stop initiated for app '{app_name}'" + } + + # Poll for completion + final_state = await self._wait_for_app_state(app_name, "STOPPED") + + return { + "success": final_state == "STOPPED", + "app_name": app_name, + "state": final_state, + "message": f"App '{app_name}' stopped successfully" if final_state == "STOPPED" + else f"App '{app_name}' may still be stopping (current state: {final_state})" + } + + @tool_handler + async def restart_app(self, app_name: str) -> Dict[str, Any]: + """ + Restart an app (stop then start) + + Args: + app_name: Name of the app to restart + + Returns: + Dictionary containing operation result + """ + await self.ensure_initialized() + + # Stop the app first + stop_result = await self.client.post("/app/stop", {"app_name": app_name}) + await self._wait_for_app_state(app_name, "STOPPED") + + # Start the app + start_result = await self.client.post("/app/start", {"app_name": app_name}) + final_state = await self._wait_for_app_state(app_name, "RUNNING") + + return { + "success": final_state == "RUNNING", + "app_name": app_name, + "state": final_state, + "message": f"App '{app_name}' restarted successfully" if final_state == "RUNNING" + else f"App '{app_name}' restart may still be in progress" + } + + @tool_handler + async def redeploy_app(self, app_name: str) -> Dict[str, Any]: + """ + Redeploy an app after configuration changes + + This pulls the latest container images and recreates containers. + + Args: + app_name: Name of the app to redeploy + + Returns: + Dictionary containing operation result + """ + await self.ensure_initialized() + + # Redeploy endpoint + result = await self.client.post("/app/redeploy", {"app_name": app_name}) + + # Poll for running state + final_state = await self._wait_for_app_state(app_name, "RUNNING") + + return { + "success": final_state == "RUNNING", + "app_name": app_name, + "state": final_state, + "message": f"App '{app_name}' redeployed successfully" if final_state == "RUNNING" + else f"App '{app_name}' redeploy may still be in progress" + } + + @tool_handler + async def update_app_config( + self, + app_name: str, + values: Dict[str, Any] + ) -> Dict[str, Any]: + """ + Update app configuration + + Args: + app_name: Name of the app to update + values: Configuration values to update (e.g., {"resources": {"limits": {"cpus": 2, "memory": 4096}}}) + + Returns: + Dictionary containing operation result + """ + await self.ensure_initialized() + + # Update the app configuration + result = await self.client.put( + f"/app/id/{app_name}", + {"values": values} + ) + + return { + "success": True, + "app_name": app_name, + "message": f"App '{app_name}' configuration updated", + "updated_values": values + } + + async def _wait_for_app_state( + self, + app_name: str, + target_state: str, + timeout: Optional[int] = None + ) -> str: + """ + Wait for an app to reach a target state + + Args: + app_name: Name of the app + target_state: State to wait for (e.g., "RUNNING", "STOPPED") + timeout: Optional timeout in seconds (default: APP_OPERATION_TIMEOUT) + + Returns: + Final state of the app + """ + timeout = timeout or self.APP_OPERATION_TIMEOUT + max_attempts = timeout // self.POLL_INTERVAL + + for _ in range(max_attempts): + try: + app = await self.client.get(f"/app/id/{app_name}") + current_state = app.get("state", "UNKNOWN") + + if current_state == target_state: + return current_state + + # If we're in an error state, return immediately + if current_state in ("CRASHED", "ERROR"): + return current_state + + except Exception as e: + self.logger.warning(f"Error polling app state: {e}") + + await asyncio.sleep(self.POLL_INTERVAL) + + # Return last known state + try: + app = await self.client.get(f"/app/id/{app_name}") + return app.get("state", "UNKNOWN") + except Exception: + return "UNKNOWN" diff --git a/truenas_mcp_server/tools/instances.py b/truenas_mcp_server/tools/instances.py new file mode 100644 index 0000000..d474906 --- /dev/null +++ b/truenas_mcp_server/tools/instances.py @@ -0,0 +1,522 @@ +""" +TrueNAS Incus Instance management tools + +Provides tools for managing Incus VMs and Containers on TrueNAS SCALE. +Uses the /api/v2.0/virt/instance endpoints. + +API Quirks (IMPORTANT!): +- POST /virt/instance/start expects a plain string body: "instance_name" (NOT an object!) +- POST /virt/instance/stop expects: {"id": "instance_name"} (an object) +- PUT /virt/instance/id/{id} expects: {"cpu": "2", "memory": 4294967296, ...} +- Instance operations are async and return job IDs +- Memory is specified in bytes (not MB!) +""" + +import asyncio +from typing import Any, Dict, List, Optional + +from .base import BaseTool, tool_handler + + +class InstanceTools(BaseTool): + """Tools for managing TrueNAS Incus instances (VMs and Containers)""" + + # Timeout for instance operations + INSTANCE_OPERATION_TIMEOUT = 120 # seconds + POLL_INTERVAL = 5 # seconds + + def get_tool_definitions(self) -> list: + """Get tool definitions for instance management""" + return [ + ("list_instances", self.list_instances, + "List all Incus instances (VMs and Containers)", + {"instance_type": {"type": "string", "required": False, + "description": "Filter by type: 'VM' or 'CONTAINER' (optional)"}}), + ("get_instance", self.get_instance, + "Get detailed information about a specific instance", + {"instance_name": {"type": "string", "required": True, + "description": "Name of the instance"}}), + ("start_instance", self.start_instance, "Start an Incus instance", + {"instance_name": {"type": "string", "required": True, + "description": "Name of the instance to start"}}), + ("stop_instance", self.stop_instance, "Stop an Incus instance", + {"instance_name": {"type": "string", "required": True, + "description": "Name of the instance to stop"}, + "force": {"type": "boolean", "required": False, + "description": "Force stop without graceful shutdown"}, + "timeout": {"type": "integer", "required": False, + "description": "Timeout in seconds for graceful shutdown"}}), + ("restart_instance", self.restart_instance, "Restart an Incus instance", + {"instance_name": {"type": "string", "required": True, + "description": "Name of the instance to restart"}}), + ("update_instance", self.update_instance, + "Update instance configuration (CPU, memory, autostart)", + {"instance_name": {"type": "string", "required": True, + "description": "Name of the instance to update"}, + "cpu": {"type": "string", "required": False, + "description": "Number of CPU cores (as string, e.g., '4')"}, + "memory": {"type": "integer", "required": False, + "description": "Memory in bytes (e.g., 8589934592 for 8GB)"}, + "autostart": {"type": "boolean", "required": False, + "description": "Whether to start instance on boot"}}), + ("list_instance_devices", self.list_instance_devices, + "List devices attached to an instance", + {"instance_name": {"type": "string", "required": True, + "description": "Name of the instance"}}), + ] + + @tool_handler + async def list_instances( + self, + instance_type: Optional[str] = None + ) -> Dict[str, Any]: + """ + List all Incus instances (VMs and Containers) + + Args: + instance_type: Optional filter by type ('VM' or 'CONTAINER') + + Returns: + Dictionary containing list of instances with status + """ + await self.ensure_initialized() + + instances = await self.client.get("/virt/instance") + + instance_list = [] + for inst in instances: + # Filter by type if specified + inst_type = inst.get("type", "UNKNOWN") + if instance_type and inst_type != instance_type.upper(): + continue + + # Convert memory to GB for readability + memory_bytes = inst.get("memory", 0) + memory_gb = round(memory_bytes / (1024**3), 2) + + instance_info = { + "id": inst.get("id"), + "name": inst.get("name") or inst.get("id"), + "type": inst_type, + "status": inst.get("status", "UNKNOWN"), + "cpu": inst.get("cpu", "0"), + "memory_gb": memory_gb, + "memory_bytes": memory_bytes, + "autostart": inst.get("autostart", False), + "image": inst.get("image"), + } + instance_list.append(instance_info) + + # Count by type and status + type_counts = {} + status_counts = {} + for inst in instance_list: + inst_type = inst["type"] + type_counts[inst_type] = type_counts.get(inst_type, 0) + 1 + + status = inst["status"] + status_counts[status] = status_counts.get(status, 0) + 1 + + return { + "success": True, + "instances": instance_list, + "metadata": { + "total_instances": len(instance_list), + "type_counts": type_counts, + "status_counts": status_counts, + } + } + + @tool_handler + async def get_instance(self, instance_name: str) -> Dict[str, Any]: + """ + Get detailed information about a specific instance + + Args: + instance_name: Name of the instance + + Returns: + Dictionary containing instance details + """ + await self.ensure_initialized() + + # Query with id filter + instances = await self.client.get(f"/virt/instance?id={instance_name}") + + if not instances: + return { + "success": False, + "error": f"Instance '{instance_name}' not found" + } + + inst = instances[0] + memory_bytes = inst.get("memory", 0) + memory_gb = round(memory_bytes / (1024**3), 2) + + return { + "success": True, + "instance": { + "id": inst.get("id"), + "name": inst.get("name") or inst.get("id"), + "type": inst.get("type"), + "status": inst.get("status"), + "cpu": inst.get("cpu"), + "memory_gb": memory_gb, + "memory_bytes": memory_bytes, + "autostart": inst.get("autostart"), + "image": inst.get("image"), + "environment": inst.get("environment", {}), + }, + "raw": inst # Include full response for debugging + } + + @tool_handler + async def start_instance(self, instance_name: str) -> Dict[str, Any]: + """ + Start an Incus instance + + NOTE: The /virt/instance/start endpoint expects a plain string body! + + Args: + instance_name: Name of the instance to start + + Returns: + Dictionary containing operation result + """ + await self.ensure_initialized() + + # Check current state first + instances = await self.client.get(f"/virt/instance?id={instance_name}") + if not instances: + return { + "success": False, + "error": f"Instance '{instance_name}' not found" + } + + current_status = instances[0].get("status", "UNKNOWN") + if current_status == "RUNNING": + return { + "success": True, + "message": f"Instance '{instance_name}' is already running", + "status": current_status + } + + # Start the instance + # NOTE: This endpoint expects a plain quoted string, not an object! + result = await self.client.post_raw( + "/virt/instance/start", + f'"{instance_name}"' + ) + + # Poll for completion + final_status = await self._wait_for_instance_status( + instance_name, "RUNNING" + ) + + return { + "success": final_status == "RUNNING", + "instance_name": instance_name, + "status": final_status, + "message": ( + f"Instance '{instance_name}' started successfully" + if final_status == "RUNNING" + else f"Instance '{instance_name}' may still be starting " + f"(current status: {final_status})" + ) + } + + @tool_handler + async def stop_instance( + self, + instance_name: str, + force: bool = False, + timeout: Optional[int] = None + ) -> Dict[str, Any]: + """ + Stop an Incus instance + + Args: + instance_name: Name of the instance to stop + force: Force stop without graceful shutdown + timeout: Timeout in seconds for graceful shutdown + + Returns: + Dictionary containing operation result + """ + await self.ensure_initialized() + + # Check current state first + instances = await self.client.get(f"/virt/instance?id={instance_name}") + if not instances: + return { + "success": False, + "error": f"Instance '{instance_name}' not found" + } + + current_status = instances[0].get("status", "UNKNOWN") + if current_status == "STOPPED": + return { + "success": True, + "message": f"Instance '{instance_name}' is already stopped", + "status": current_status + } + + # Build request body + body: Dict[str, Any] = {"id": instance_name} + if force: + body["force"] = True + if timeout: + body["timeout"] = timeout + + # Stop the instance + result = await self.client.post("/virt/instance/stop", body) + + # Poll for completion + final_status = await self._wait_for_instance_status( + instance_name, "STOPPED" + ) + + return { + "success": final_status == "STOPPED", + "instance_name": instance_name, + "status": final_status, + "message": ( + f"Instance '{instance_name}' stopped successfully" + if final_status == "STOPPED" + else f"Instance '{instance_name}' may still be stopping " + f"(current status: {final_status})" + ) + } + + @tool_handler + async def restart_instance(self, instance_name: str) -> Dict[str, Any]: + """ + Restart an Incus instance (stop then start) + + Args: + instance_name: Name of the instance to restart + + Returns: + Dictionary containing operation result + """ + await self.ensure_initialized() + + # Check if instance exists + instances = await self.client.get(f"/virt/instance?id={instance_name}") + if not instances: + return { + "success": False, + "error": f"Instance '{instance_name}' not found" + } + + initial_status = instances[0].get("status", "UNKNOWN") + + # Stop if running + if initial_status == "RUNNING": + await self.client.post("/virt/instance/stop", {"id": instance_name}) + await self._wait_for_instance_status(instance_name, "STOPPED") + + # Start the instance + await self.client.post_raw("/virt/instance/start", f'"{instance_name}"') + final_status = await self._wait_for_instance_status( + instance_name, "RUNNING" + ) + + return { + "success": final_status == "RUNNING", + "instance_name": instance_name, + "status": final_status, + "message": ( + f"Instance '{instance_name}' restarted successfully" + if final_status == "RUNNING" + else f"Instance '{instance_name}' restart may still be in progress" + ) + } + + @tool_handler + async def update_instance( + self, + instance_name: str, + cpu: Optional[str] = None, + memory: Optional[int] = None, + autostart: Optional[bool] = None + ) -> Dict[str, Any]: + """ + Update instance configuration + + NOTE: The instance should typically be stopped before updating. + + Args: + instance_name: Name of the instance to update + cpu: Number of CPU cores (as string, e.g., '4') + memory: Memory in bytes (e.g., 8589934592 for 8GB) + autostart: Whether to start instance on boot + + Returns: + Dictionary containing operation result + """ + await self.ensure_initialized() + + # Check if instance exists + instances = await self.client.get(f"/virt/instance?id={instance_name}") + if not instances: + return { + "success": False, + "error": f"Instance '{instance_name}' not found" + } + + current = instances[0] + was_running = current.get("status") == "RUNNING" + + # Build update body - only include provided fields + update_body: Dict[str, Any] = {} + if cpu is not None: + update_body["cpu"] = cpu + if memory is not None: + update_body["memory"] = memory + if autostart is not None: + update_body["autostart"] = autostart + + if not update_body: + return { + "success": False, + "error": "No update parameters provided" + } + + # Update the instance + result = await self.client.put( + f"/virt/instance/id/{instance_name}", + update_body + ) + + # Get updated state + instances = await self.client.get(f"/virt/instance?id={instance_name}") + updated = instances[0] if instances else {} + + return { + "success": True, + "instance_name": instance_name, + "message": f"Instance '{instance_name}' configuration updated", + "updated_values": update_body, + "current_config": { + "cpu": updated.get("cpu"), + "memory": updated.get("memory"), + "autostart": updated.get("autostart"), + }, + "note": ( + "Instance may need restart for changes to take effect" + if was_running else None + ) + } + + @tool_handler + async def list_instance_devices(self, instance_name: str) -> Dict[str, Any]: + """ + List devices attached to an instance + + Args: + instance_name: Name of the instance + + Returns: + Dictionary containing list of attached devices + """ + await self.ensure_initialized() + + # Get instance details + instances = await self.client.get(f"/virt/instance?id={instance_name}") + if not instances: + return { + "success": False, + "error": f"Instance '{instance_name}' not found" + } + + inst = instances[0] + + # Extract device information + # TrueNAS returns devices in various formats depending on version + devices = [] + + # Try different device sources + raw_devices = inst.get("devices", {}) + if isinstance(raw_devices, dict): + for dev_name, dev_config in raw_devices.items(): + if isinstance(dev_config, dict): + devices.append({ + "name": dev_name, + "type": dev_config.get("type", "UNKNOWN"), + "source": dev_config.get("source"), + "path": dev_config.get("path"), + "readonly": dev_config.get("readonly", False), + "config": dev_config + }) + elif isinstance(raw_devices, list): + for dev in raw_devices: + devices.append({ + "name": dev.get("name", "unknown"), + "type": dev.get("type", "UNKNOWN"), + "source": dev.get("source"), + "path": dev.get("path"), + "readonly": dev.get("readonly", False), + "config": dev + }) + + return { + "success": True, + "instance_name": instance_name, + "devices": devices, + "metadata": { + "device_count": len(devices) + } + } + + async def _wait_for_instance_status( + self, + instance_name: str, + target_status: str, + timeout: Optional[int] = None + ) -> str: + """ + Wait for an instance to reach a target status + + Args: + instance_name: Name of the instance + target_status: Status to wait for (e.g., "RUNNING", "STOPPED") + timeout: Optional timeout in seconds + + Returns: + Final status of the instance + """ + timeout = timeout or self.INSTANCE_OPERATION_TIMEOUT + max_attempts = timeout // self.POLL_INTERVAL + + for _ in range(max_attempts): + try: + instances = await self.client.get( + f"/virt/instance?id={instance_name}" + ) + if instances: + current_status = instances[0].get("status", "UNKNOWN") + + if current_status == target_status: + return current_status + + # If we hit an error state, return immediately + if current_status == "ERROR": + return current_status + + except Exception as e: + self.logger.warning(f"Error polling instance status: {e}") + + await asyncio.sleep(self.POLL_INTERVAL) + + # Return last known status + try: + instances = await self.client.get( + f"/virt/instance?id={instance_name}" + ) + if instances: + return instances[0].get("status", "UNKNOWN") + except Exception: + pass + + return "UNKNOWN" diff --git a/truenas_mcp_server/tools/vms.py b/truenas_mcp_server/tools/vms.py new file mode 100644 index 0000000..0ed754b --- /dev/null +++ b/truenas_mcp_server/tools/vms.py @@ -0,0 +1,481 @@ +""" +TrueNAS Legacy VM management tools (bhyve-based) + +Provides tools for managing legacy bhyve VMs on TrueNAS SCALE. +Uses the /api/v2.0/vm endpoints. + +NOTE: For new deployments, consider using Incus VMs (/api/v2.0/virt/instance) +instead. Legacy VMs are maintained for backward compatibility. +""" + +import asyncio +from typing import Any, Dict, List, Optional + +from .base import BaseTool, tool_handler + + +class LegacyVMTools(BaseTool): + """Tools for managing TrueNAS legacy bhyve VMs""" + + # Timeout for VM operations + VM_OPERATION_TIMEOUT = 120 # seconds + POLL_INTERVAL = 5 # seconds + + def get_tool_definitions(self) -> list: + """Get tool definitions for legacy VM management""" + return [ + ("list_legacy_vms", self.list_legacy_vms, + "List all legacy bhyve VMs", {}), + ("get_legacy_vm", self.get_legacy_vm, + "Get detailed information about a legacy VM", + {"vm_id": {"type": "integer", "required": True, + "description": "Numeric ID of the VM"}}), + ("start_legacy_vm", self.start_legacy_vm, + "Start a legacy VM", + {"vm_id": {"type": "integer", "required": True, + "description": "Numeric ID of the VM to start"}}), + ("stop_legacy_vm", self.stop_legacy_vm, + "Stop a legacy VM", + {"vm_id": {"type": "integer", "required": True, + "description": "Numeric ID of the VM to stop"}, + "force": {"type": "boolean", "required": False, + "description": "Force stop (poweroff) without graceful shutdown"}}), + ("restart_legacy_vm", self.restart_legacy_vm, + "Restart a legacy VM", + {"vm_id": {"type": "integer", "required": True, + "description": "Numeric ID of the VM to restart"}}), + ("update_legacy_vm", self.update_legacy_vm, + "Update legacy VM configuration", + {"vm_id": {"type": "integer", "required": True, + "description": "Numeric ID of the VM to update"}, + "name": {"type": "string", "required": False, + "description": "New VM name"}, + "vcpus": {"type": "integer", "required": False, + "description": "Number of virtual CPUs"}, + "memory": {"type": "integer", "required": False, + "description": "Memory in MB"}, + "autostart": {"type": "boolean", "required": False, + "description": "Start on boot"}}), + ("get_legacy_vm_status", self.get_legacy_vm_status, + "Get the runtime status of a legacy VM", + {"vm_id": {"type": "integer", "required": True, + "description": "Numeric ID of the VM"}}), + ] + + @tool_handler + async def list_legacy_vms(self) -> Dict[str, Any]: + """ + List all legacy bhyve VMs + + Returns: + Dictionary containing list of VMs with their status + """ + await self.ensure_initialized() + + vms = await self.client.get("/vm") + + vm_list = [] + for vm in vms: + # Get status for each VM + vm_id = vm.get("id") + status = await self._get_vm_status(vm_id) + + vm_info = { + "id": vm_id, + "name": vm.get("name"), + "description": vm.get("description"), + "vcpus": vm.get("vcpus", 1), + "memory_mb": vm.get("memory", 0), + "autostart": vm.get("autostart", False), + "status": status, + "bootloader": vm.get("bootloader"), + } + vm_list.append(vm_info) + + # Count by status + status_counts = {} + for vm in vm_list: + status = vm["status"] + status_counts[status] = status_counts.get(status, 0) + 1 + + return { + "success": True, + "vms": vm_list, + "metadata": { + "total_vms": len(vm_list), + "status_counts": status_counts, + } + } + + @tool_handler + async def get_legacy_vm(self, vm_id: int) -> Dict[str, Any]: + """ + Get detailed information about a legacy VM + + Args: + vm_id: Numeric ID of the VM + + Returns: + Dictionary containing VM details + """ + await self.ensure_initialized() + + try: + vm = await self.client.get(f"/vm/id/{vm_id}") + except Exception: + return { + "success": False, + "error": f"VM with ID {vm_id} not found" + } + + # Get runtime status + status = await self._get_vm_status(vm_id) + + # Parse device information + devices = [] + for device in vm.get("devices", []): + devices.append({ + "id": device.get("id"), + "type": device.get("dtype"), + "order": device.get("order", 1000), + "attributes": device.get("attributes", {}), + }) + + return { + "success": True, + "vm": { + "id": vm.get("id"), + "name": vm.get("name"), + "description": vm.get("description"), + "vcpus": vm.get("vcpus"), + "memory_mb": vm.get("memory"), + "min_memory": vm.get("min_memory"), + "autostart": vm.get("autostart"), + "bootloader": vm.get("bootloader"), + "time": vm.get("time"), + "shutdown_timeout": vm.get("shutdown_timeout"), + "cpu_mode": vm.get("cpu_mode"), + "cpu_model": vm.get("cpu_model"), + "status": status, + "devices": devices, + }, + "raw": vm # Include full response for debugging + } + + @tool_handler + async def start_legacy_vm(self, vm_id: int) -> Dict[str, Any]: + """ + Start a legacy VM + + Args: + vm_id: Numeric ID of the VM to start + + Returns: + Dictionary containing operation result + """ + await self.ensure_initialized() + + # Check current status + status = await self._get_vm_status(vm_id) + if status == "RUNNING": + return { + "success": True, + "message": f"VM {vm_id} is already running", + "status": status + } + + # Start the VM + try: + result = await self.client.post(f"/vm/id/{vm_id}/start") + except Exception as e: + return { + "success": False, + "error": f"Failed to start VM {vm_id}: {str(e)}" + } + + # Poll for running state + final_status = await self._wait_for_vm_status(vm_id, "RUNNING") + + return { + "success": final_status == "RUNNING", + "vm_id": vm_id, + "status": final_status, + "message": ( + f"VM {vm_id} started successfully" + if final_status == "RUNNING" + else f"VM {vm_id} may still be starting (current status: {final_status})" + ) + } + + @tool_handler + async def stop_legacy_vm( + self, + vm_id: int, + force: bool = False + ) -> Dict[str, Any]: + """ + Stop a legacy VM + + Args: + vm_id: Numeric ID of the VM to stop + force: Force stop (poweroff) without graceful shutdown + + Returns: + Dictionary containing operation result + """ + await self.ensure_initialized() + + # Check current status + status = await self._get_vm_status(vm_id) + if status == "STOPPED": + return { + "success": True, + "message": f"VM {vm_id} is already stopped", + "status": status + } + + # Stop the VM + endpoint = f"/vm/id/{vm_id}/stop" + body = {} + if force: + body["force"] = True + + try: + result = await self.client.post(endpoint, body if body else None) + except Exception as e: + return { + "success": False, + "error": f"Failed to stop VM {vm_id}: {str(e)}" + } + + # Poll for stopped state + final_status = await self._wait_for_vm_status(vm_id, "STOPPED") + + return { + "success": final_status == "STOPPED", + "vm_id": vm_id, + "status": final_status, + "message": ( + f"VM {vm_id} stopped successfully" + if final_status == "STOPPED" + else f"VM {vm_id} may still be stopping (current status: {final_status})" + ) + } + + @tool_handler + async def restart_legacy_vm(self, vm_id: int) -> Dict[str, Any]: + """ + Restart a legacy VM + + Args: + vm_id: Numeric ID of the VM to restart + + Returns: + Dictionary containing operation result + """ + await self.ensure_initialized() + + # Check if VM exists + try: + vm = await self.client.get(f"/vm/id/{vm_id}") + except Exception: + return { + "success": False, + "error": f"VM with ID {vm_id} not found" + } + + initial_status = await self._get_vm_status(vm_id) + + # Stop if running + if initial_status == "RUNNING": + try: + await self.client.post(f"/vm/id/{vm_id}/stop") + await self._wait_for_vm_status(vm_id, "STOPPED") + except Exception as e: + return { + "success": False, + "error": f"Failed to stop VM {vm_id}: {str(e)}" + } + + # Start the VM + try: + await self.client.post(f"/vm/id/{vm_id}/start") + except Exception as e: + return { + "success": False, + "error": f"Failed to start VM {vm_id}: {str(e)}" + } + + final_status = await self._wait_for_vm_status(vm_id, "RUNNING") + + return { + "success": final_status == "RUNNING", + "vm_id": vm_id, + "status": final_status, + "message": ( + f"VM {vm_id} restarted successfully" + if final_status == "RUNNING" + else f"VM {vm_id} restart may still be in progress" + ) + } + + @tool_handler + async def update_legacy_vm( + self, + vm_id: int, + name: Optional[str] = None, + vcpus: Optional[int] = None, + memory: Optional[int] = None, + autostart: Optional[bool] = None + ) -> Dict[str, Any]: + """ + Update legacy VM configuration + + NOTE: Some changes may require the VM to be stopped first. + + Args: + vm_id: Numeric ID of the VM to update + name: New VM name + vcpus: Number of virtual CPUs + memory: Memory in MB + autostart: Start on boot + + Returns: + Dictionary containing operation result + """ + await self.ensure_initialized() + + # Check if VM exists + try: + vm = await self.client.get(f"/vm/id/{vm_id}") + except Exception: + return { + "success": False, + "error": f"VM with ID {vm_id} not found" + } + + was_running = await self._get_vm_status(vm_id) == "RUNNING" + + # Build update body - only include provided fields + update_body: Dict[str, Any] = {} + if name is not None: + update_body["name"] = name + if vcpus is not None: + update_body["vcpus"] = vcpus + if memory is not None: + update_body["memory"] = memory + if autostart is not None: + update_body["autostart"] = autostart + + if not update_body: + return { + "success": False, + "error": "No update parameters provided" + } + + # Update the VM + try: + result = await self.client.put(f"/vm/id/{vm_id}", update_body) + except Exception as e: + return { + "success": False, + "error": f"Failed to update VM {vm_id}: {str(e)}" + } + + # Get updated VM info + try: + updated_vm = await self.client.get(f"/vm/id/{vm_id}") + except Exception: + updated_vm = {} + + return { + "success": True, + "vm_id": vm_id, + "message": f"VM {vm_id} configuration updated", + "updated_values": update_body, + "current_config": { + "name": updated_vm.get("name"), + "vcpus": updated_vm.get("vcpus"), + "memory_mb": updated_vm.get("memory"), + "autostart": updated_vm.get("autostart"), + }, + "note": ( + "VM may need restart for CPU/memory changes to take effect" + if was_running and (vcpus or memory) else None + ) + } + + @tool_handler + async def get_legacy_vm_status(self, vm_id: int) -> Dict[str, Any]: + """ + Get the runtime status of a legacy VM + + Args: + vm_id: Numeric ID of the VM + + Returns: + Dictionary containing VM status + """ + await self.ensure_initialized() + + status = await self._get_vm_status(vm_id) + + return { + "success": True, + "vm_id": vm_id, + "status": status + } + + async def _get_vm_status(self, vm_id: int) -> str: + """ + Get the status of a VM + + Args: + vm_id: Numeric ID of the VM + + Returns: + Status string (RUNNING, STOPPED, etc.) + """ + try: + result = await self.client.get(f"/vm/id/{vm_id}/status") + if isinstance(result, dict): + return result.get("state", "UNKNOWN") + return str(result) if result else "UNKNOWN" + except Exception as e: + self.logger.warning(f"Failed to get VM {vm_id} status: {e}") + return "UNKNOWN" + + async def _wait_for_vm_status( + self, + vm_id: int, + target_status: str, + timeout: Optional[int] = None + ) -> str: + """ + Wait for a VM to reach a target status + + Args: + vm_id: Numeric ID of the VM + target_status: Status to wait for (e.g., "RUNNING", "STOPPED") + timeout: Optional timeout in seconds + + Returns: + Final status of the VM + """ + timeout = timeout or self.VM_OPERATION_TIMEOUT + max_attempts = timeout // self.POLL_INTERVAL + + for _ in range(max_attempts): + status = await self._get_vm_status(vm_id) + + if status == target_status: + return status + + # If we hit an error, return immediately + if status == "ERROR": + return status + + await asyncio.sleep(self.POLL_INTERVAL) + + # Return last known status + return await self._get_vm_status(vm_id)