diff --git a/README.md b/README.md index df34d70..929745f 100644 --- a/README.md +++ b/README.md @@ -247,6 +247,74 @@ asyncio.run(main()) - `test_connection` - Verify API connectivity - `get_server_stats` - Server statistics +## 📄 Pagination and Response Control + +All list operations support pagination to reduce token usage when working with LLM clients. Get operations support optional raw API response inclusion for debugging. + +### Pagination Parameters + +All `list_*` tools support these parameters: + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `limit` | integer | 100 | Maximum items to return (max: 500) | +| `offset` | integer | 0 | Number of items to skip | + +**Response format:** +```json +{ + "success": true, + "items": [...], + "metadata": { ... }, + "pagination": { + "total": 250, + "limit": 100, + "offset": 0, + "returned": 100, + "has_more": true + } +} +``` + +**Usage examples:** +``` +"List the first 10 datasets" → limit=10 +"Show users 50-100" → limit=50, offset=50 +"Get all SMB shares (up to 500)" → limit=500 +``` + +### Include Raw API Response + +Get operations for apps, instances, and VMs support the `include_raw` parameter: + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `include_raw` | boolean | false | Include full API response for debugging | + +**When to use `include_raw=true`:** +- Debugging API response structure +- Accessing fields not included in the formatted response +- Troubleshooting integration issues + +**Tools supporting `include_raw`:** +- `get_app` - App details +- `get_instance` - Incus instance details +- `get_legacy_vm` - Legacy VM details + +### Dataset Response Control + +The `list_datasets` and `get_dataset` tools support an additional parameter: + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `include_children` | boolean | true | Include child datasets (can reduce payload significantly) | + +**Usage:** +``` +"List only top-level datasets" → include_children=false +"Get tank dataset without children" → include_children=false +``` + ## 🏗️ Architecture ``` diff --git a/truenas_mcp_server/tools/apps.py b/truenas_mcp_server/tools/apps.py index e6fcdd9..e67b5c7 100644 --- a/truenas_mcp_server/tools/apps.py +++ b/truenas_mcp_server/tools/apps.py @@ -27,10 +27,16 @@ class AppTools(BaseTool): def get_tool_definitions(self) -> list: """Get tool definitions for app management""" return [ - ("list_apps", self.list_apps, "List all TrueNAS apps with status", {}), + ("list_apps", self.list_apps, "List all TrueNAS apps with status", + {"limit": {"type": "integer", "required": False, + "description": "Max items to return (default: 100, max: 500)"}, + "offset": {"type": "integer", "required": False, + "description": "Items to skip for pagination"}}), ("get_app", self.get_app, "Get detailed information about a specific app", {"app_name": {"type": "string", "required": True, - "description": "Name of the app"}}), + "description": "Name of the app"}, + "include_raw": {"type": "boolean", "required": False, + "description": "Include full API response for debugging (default: false)"}}), ("get_app_config", self.get_app_config, "Get the full configuration of an app", {"app_name": {"type": "string", "required": True, @@ -59,10 +65,18 @@ def get_tool_definitions(self) -> list: ] @tool_handler - async def list_apps(self) -> Dict[str, Any]: + async def list_apps( + self, + limit: int = BaseTool.DEFAULT_LIMIT, + offset: int = 0 + ) -> Dict[str, Any]: """ List all TrueNAS apps with their status + Args: + limit: Maximum number of items to return (default: 100, max: 500) + offset: Number of items to skip for pagination + Returns: Dictionary containing list of apps with status and metadata """ @@ -95,28 +109,39 @@ async def list_apps(self) -> Dict[str, Any]: } app_list.append(app_info) - # Count by state + # Count by state (before pagination) state_counts = {} for app in app_list: state = app["state"] state_counts[state] = state_counts.get(state, 0) + 1 + total_apps = len(app_list) + + # Apply pagination + paginated_apps, pagination = self.apply_pagination(app_list, limit, offset) + return { "success": True, - "apps": app_list, + "apps": paginated_apps, "metadata": { - "total_apps": len(app_list), + "total_apps": total_apps, "state_counts": state_counts, - } + }, + "pagination": pagination } @tool_handler - async def get_app(self, app_name: str) -> Dict[str, Any]: + async def get_app( + self, + app_name: str, + include_raw: bool = False + ) -> Dict[str, Any]: """ Get detailed information about a specific app Args: app_name: Name of the app + include_raw: Include full API response for debugging (default: false) Returns: Dictionary containing app details @@ -140,7 +165,7 @@ async def get_app(self, app_name: str) -> Dict[str, Any]: "error": f"App '{app_name}' not found" } - return { + result = { "success": True, "app": { "name": app.get("id") or app.get("name"), @@ -150,10 +175,14 @@ async def get_app(self, app_name: str) -> Dict[str, Any]: "upgrade_available": app.get("upgrade_available", False), "portal": app.get("portal"), "metadata": app.get("metadata", {}), - }, - "raw": app # Include full response for debugging + } } + if include_raw: + result["raw"] = app + + return result + @tool_handler async def get_app_config(self, app_name: str) -> Dict[str, Any]: """ diff --git a/truenas_mcp_server/tools/base.py b/truenas_mcp_server/tools/base.py index ac7d457..14db73f 100644 --- a/truenas_mcp_server/tools/base.py +++ b/truenas_mcp_server/tools/base.py @@ -4,7 +4,7 @@ import logging from functools import wraps -from typing import Any, Dict, Optional, Callable +from typing import Any, Dict, List, Optional, Callable, Tuple from abc import ABC, abstractmethod from ..client import TrueNASClient @@ -66,14 +66,19 @@ async def wrapper(self, *args, **kwargs) -> Dict[str, Any]: class BaseTool(ABC): """ Base class for all MCP tools - + Provides common functionality for tool implementations including: - Client management - Configuration access - Logging setup - Error handling utilities + - Pagination support """ - + + # Pagination defaults + DEFAULT_LIMIT = 100 + MAX_LIMIT = 500 + def __init__(self, client: Optional[TrueNASClient] = None, settings: Optional[Settings] = None): """ Initialize the tool @@ -179,18 +184,51 @@ def parse_size(self, size_str: str) -> int: def validate_required_fields(self, data: Dict[str, Any], required: list) -> bool: """ Validate that required fields are present in data - + Args: data: Data dictionary to validate required: List of required field names - + Returns: True if all required fields are present - + Raises: ValueError: If any required fields are missing """ missing = [field for field in required if field not in data or data[field] is None] if missing: raise ValueError(f"Missing required fields: {', '.join(missing)}") - return True \ No newline at end of file + return True + + def apply_pagination( + self, + items: List[Any], + limit: int = DEFAULT_LIMIT, + offset: int = 0 + ) -> Tuple[List[Any], Dict[str, Any]]: + """ + Apply pagination to a list of items + + Args: + items: Full list of items to paginate + limit: Maximum items to return (capped at MAX_LIMIT) + offset: Number of items to skip + + Returns: + Tuple of (paginated_items, pagination_metadata) + """ + # Cap limit at MAX_LIMIT + limit = min(limit, self.MAX_LIMIT) + + total = len(items) + paginated = items[offset:offset + limit] + + pagination = { + "total": total, + "limit": limit, + "offset": offset, + "returned": len(paginated), + "has_more": offset + limit < total + } + + return paginated, pagination \ No newline at end of file diff --git a/truenas_mcp_server/tools/instances.py b/truenas_mcp_server/tools/instances.py index d474906..3d22d4b 100644 --- a/truenas_mcp_server/tools/instances.py +++ b/truenas_mcp_server/tools/instances.py @@ -31,11 +31,17 @@ def get_tool_definitions(self) -> list: ("list_instances", self.list_instances, "List all Incus instances (VMs and Containers)", {"instance_type": {"type": "string", "required": False, - "description": "Filter by type: 'VM' or 'CONTAINER' (optional)"}}), + "description": "Filter by type: 'VM' or 'CONTAINER' (optional)"}, + "limit": {"type": "integer", "required": False, + "description": "Max items to return (default: 100, max: 500)"}, + "offset": {"type": "integer", "required": False, + "description": "Items to skip for pagination"}}), ("get_instance", self.get_instance, "Get detailed information about a specific instance", {"instance_name": {"type": "string", "required": True, - "description": "Name of the instance"}}), + "description": "Name of the instance"}, + "include_raw": {"type": "boolean", "required": False, + "description": "Include full API response for debugging (default: false)"}}), ("start_instance", self.start_instance, "Start an Incus instance", {"instance_name": {"type": "string", "required": True, "description": "Name of the instance to start"}}), @@ -68,13 +74,17 @@ def get_tool_definitions(self) -> list: @tool_handler async def list_instances( self, - instance_type: Optional[str] = None + instance_type: Optional[str] = None, + limit: int = BaseTool.DEFAULT_LIMIT, + offset: int = 0 ) -> Dict[str, Any]: """ List all Incus instances (VMs and Containers) Args: instance_type: Optional filter by type ('VM' or 'CONTAINER') + limit: Maximum number of items to return (default: 100, max: 500) + offset: Number of items to skip for pagination Returns: Dictionary containing list of instances with status @@ -107,7 +117,7 @@ async def list_instances( } instance_list.append(instance_info) - # Count by type and status + # Count by type and status (before pagination) type_counts = {} status_counts = {} for inst in instance_list: @@ -117,23 +127,34 @@ async def list_instances( status = inst["status"] status_counts[status] = status_counts.get(status, 0) + 1 + total_instances = len(instance_list) + + # Apply pagination + paginated_instances, pagination = self.apply_pagination(instance_list, limit, offset) + return { "success": True, - "instances": instance_list, + "instances": paginated_instances, "metadata": { - "total_instances": len(instance_list), + "total_instances": total_instances, "type_counts": type_counts, "status_counts": status_counts, - } + }, + "pagination": pagination } @tool_handler - async def get_instance(self, instance_name: str) -> Dict[str, Any]: + async def get_instance( + self, + instance_name: str, + include_raw: bool = False + ) -> Dict[str, Any]: """ Get detailed information about a specific instance Args: instance_name: Name of the instance + include_raw: Include full API response for debugging (default: false) Returns: Dictionary containing instance details @@ -153,7 +174,7 @@ async def get_instance(self, instance_name: str) -> Dict[str, Any]: memory_bytes = inst.get("memory", 0) memory_gb = round(memory_bytes / (1024**3), 2) - return { + result = { "success": True, "instance": { "id": inst.get("id"), @@ -166,10 +187,14 @@ async def get_instance(self, instance_name: str) -> Dict[str, Any]: "autostart": inst.get("autostart"), "image": inst.get("image"), "environment": inst.get("environment", {}), - }, - "raw": inst # Include full response for debugging + } } + if include_raw: + result["raw"] = inst + + return result + @tool_handler async def start_instance(self, instance_name: str) -> Dict[str, Any]: """ diff --git a/truenas_mcp_server/tools/sharing.py b/truenas_mcp_server/tools/sharing.py index 77a9994..d56794e 100644 --- a/truenas_mcp_server/tools/sharing.py +++ b/truenas_mcp_server/tools/sharing.py @@ -13,7 +13,11 @@ def get_tool_definitions(self) -> list: """Get tool definitions for sharing management""" return [ # SMB Tools - ("list_smb_shares", self.list_smb_shares, "List all SMB shares", {}), + ("list_smb_shares", self.list_smb_shares, "List all SMB shares", + {"limit": {"type": "integer", "required": False, + "description": "Max items to return (default: 100, max: 500)"}, + "offset": {"type": "integer", "required": False, + "description": "Items to skip for pagination"}}), ("create_smb_share", self.create_smb_share, "Create a new SMB share", {"path": {"type": "string", "required": True}, "name": {"type": "string", "required": True}, @@ -21,9 +25,13 @@ def get_tool_definitions(self) -> list: "read_only": {"type": "boolean", "required": False}}), ("delete_smb_share", self.delete_smb_share, "Delete an SMB share", {"share_name": {"type": "string", "required": True}}), - + # NFS Tools - ("list_nfs_exports", self.list_nfs_exports, "List all NFS exports", {}), + ("list_nfs_exports", self.list_nfs_exports, "List all NFS exports", + {"limit": {"type": "integer", "required": False, + "description": "Max items to return (default: 100, max: 500)"}, + "offset": {"type": "integer", "required": False, + "description": "Items to skip for pagination"}}), ("create_nfs_export", self.create_nfs_export, "Create an NFS export", {"path": {"type": "string", "required": True}, "allowed_networks": {"type": "array", "required": False}, @@ -32,9 +40,13 @@ def get_tool_definitions(self) -> list: "maproot_group": {"type": "string", "required": False}}), ("delete_nfs_export", self.delete_nfs_export, "Delete an NFS export", {"export_id": {"type": "integer", "required": True}}), - + # iSCSI Tools - ("list_iscsi_targets", self.list_iscsi_targets, "List all iSCSI targets", {}), + ("list_iscsi_targets", self.list_iscsi_targets, "List all iSCSI targets", + {"limit": {"type": "integer", "required": False, + "description": "Max items to return (default: 100, max: 500)"}, + "offset": {"type": "integer", "required": False, + "description": "Items to skip for pagination"}}), ("create_iscsi_target", self.create_iscsi_target, "Create an iSCSI target", {"name": {"type": "string", "required": True}, "alias": {"type": "string", "required": False}}), @@ -43,17 +55,25 @@ def get_tool_definitions(self) -> list: # SMB Share Management @tool_handler - async def list_smb_shares(self) -> Dict[str, Any]: + async def list_smb_shares( + self, + limit: int = BaseTool.DEFAULT_LIMIT, + offset: int = 0 + ) -> Dict[str, Any]: """ List all SMB shares - + + Args: + limit: Maximum number of items to return (default: 100, max: 500) + offset: Number of items to skip for pagination + Returns: Dictionary containing list of SMB shares """ await self.ensure_initialized() - + shares = await self.client.get("/sharing/smb") - + share_list = [] for share in shares: share_info = { @@ -73,17 +93,28 @@ async def list_smb_shares(self) -> Dict[str, Any]: "audit": share.get("audit", {}) } share_list.append(share_info) - + + # Calculate counts before pagination + total_shares = len(share_list) + enabled_shares = sum(1 for s in share_list if s["enabled"]) + read_only_shares = sum(1 for s in share_list if s["read_only"]) + guest_shares = sum(1 for s in share_list if s["guest_ok"]) + timemachine_shares = sum(1 for s in share_list if s["timemachine"]) + + # Apply pagination + paginated_shares, pagination = self.apply_pagination(share_list, limit, offset) + return { "success": True, - "shares": share_list, + "shares": paginated_shares, "metadata": { - "total_shares": len(share_list), - "enabled_shares": sum(1 for s in share_list if s["enabled"]), - "read_only_shares": sum(1 for s in share_list if s["read_only"]), - "guest_shares": sum(1 for s in share_list if s["guest_ok"]), - "timemachine_shares": sum(1 for s in share_list if s["timemachine"]) - } + "total_shares": total_shares, + "enabled_shares": enabled_shares, + "read_only_shares": read_only_shares, + "guest_shares": guest_shares, + "timemachine_shares": timemachine_shares + }, + "pagination": pagination } @tool_handler @@ -194,17 +225,25 @@ async def delete_smb_share(self, share_name: str) -> Dict[str, Any]: # NFS Export Management @tool_handler - async def list_nfs_exports(self) -> Dict[str, Any]: + async def list_nfs_exports( + self, + limit: int = BaseTool.DEFAULT_LIMIT, + offset: int = 0 + ) -> Dict[str, Any]: """ List all NFS exports - + + Args: + limit: Maximum number of items to return (default: 100, max: 500) + offset: Number of items to skip for pagination + Returns: Dictionary containing list of NFS exports """ await self.ensure_initialized() - + exports = await self.client.get("/sharing/nfs") - + export_list = [] for export in exports: export_info = { @@ -223,16 +262,26 @@ async def list_nfs_exports(self) -> Dict[str, Any]: "security": export.get("security", []) } export_list.append(export_info) - + + # Calculate counts before pagination + total_exports = len(export_list) + enabled_exports = sum(1 for e in export_list if e["enabled"]) + read_only_exports = sum(1 for e in export_list if e["read_only"]) + alldirs_exports = sum(1 for e in export_list if e["alldirs"]) + + # Apply pagination + paginated_exports, pagination = self.apply_pagination(export_list, limit, offset) + return { "success": True, - "exports": export_list, + "exports": paginated_exports, "metadata": { - "total_exports": len(export_list), - "enabled_exports": sum(1 for e in export_list if e["enabled"]), - "read_only_exports": sum(1 for e in export_list if e["read_only"]), - "alldirs_exports": sum(1 for e in export_list if e["alldirs"]) - } + "total_exports": total_exports, + "enabled_exports": enabled_exports, + "read_only_exports": read_only_exports, + "alldirs_exports": alldirs_exports + }, + "pagination": pagination } @tool_handler @@ -348,15 +397,23 @@ async def delete_nfs_export(self, export_id: int) -> Dict[str, Any]: # iSCSI Management @tool_handler - async def list_iscsi_targets(self) -> Dict[str, Any]: + async def list_iscsi_targets( + self, + limit: int = BaseTool.DEFAULT_LIMIT, + offset: int = 0 + ) -> Dict[str, Any]: """ List all iSCSI targets - + + Args: + limit: Maximum number of items to return (default: 100, max: 500) + offset: Number of items to skip for pagination + Returns: Dictionary containing list of iSCSI targets """ await self.ensure_initialized() - + targets = await self.client.get("/iscsi/target") extents = await self.client.get("/iscsi/extent") target_extents = await self.client.get("/iscsi/targetextent") @@ -401,15 +458,24 @@ async def list_iscsi_targets(self) -> Dict[str, Any]: "extents": target_extents_info } target_list.append(target_info) - + + # Calculate counts before pagination + total_targets = len(target_list) + total_extents = len(extents) + targets_with_extents = sum(1 for t in target_list if t["extents"]) + + # Apply pagination + paginated_targets, pagination = self.apply_pagination(target_list, limit, offset) + return { "success": True, - "targets": target_list, + "targets": paginated_targets, "metadata": { - "total_targets": len(target_list), - "total_extents": len(extents), - "targets_with_extents": sum(1 for t in target_list if t["extents"]) - } + "total_targets": total_targets, + "total_extents": total_extents, + "targets_with_extents": targets_with_extents + }, + "pagination": pagination } @tool_handler diff --git a/truenas_mcp_server/tools/snapshots.py b/truenas_mcp_server/tools/snapshots.py index b7a68db..542fd4f 100644 --- a/truenas_mcp_server/tools/snapshots.py +++ b/truenas_mcp_server/tools/snapshots.py @@ -14,7 +14,12 @@ def get_tool_definitions(self) -> list: """Get tool definitions for snapshot management""" return [ ("list_snapshots", self.list_snapshots, "List snapshots for a dataset", - {"dataset": {"type": "string", "required": False}}), + {"dataset": {"type": "string", "required": False, + "description": "Filter by dataset path"}, + "limit": {"type": "integer", "required": False, + "description": "Max items to return (default: 100, max: 500)"}, + "offset": {"type": "integer", "required": False, + "description": "Items to skip for pagination"}}), ("create_snapshot", self.create_snapshot, "Create a snapshot of a dataset", {"dataset": {"type": "string", "required": True}, "name": {"type": "string", "required": False}, @@ -27,7 +32,11 @@ def get_tool_definitions(self) -> list: ("clone_snapshot", self.clone_snapshot, "Clone a snapshot to a new dataset", {"snapshot": {"type": "string", "required": True}, "target": {"type": "string", "required": True}}), - ("list_snapshot_tasks", self.list_snapshot_tasks, "List automated snapshot tasks", {}), + ("list_snapshot_tasks", self.list_snapshot_tasks, "List automated snapshot tasks", + {"limit": {"type": "integer", "required": False, + "description": "Max items to return (default: 100, max: 500)"}, + "offset": {"type": "integer", "required": False, + "description": "Items to skip for pagination"}}), ("create_snapshot_task", self.create_snapshot_task, "Create automated snapshot task", {"dataset": {"type": "string", "required": True}, "schedule": {"type": "object", "required": True}, @@ -36,25 +45,32 @@ def get_tool_definitions(self) -> list: ] @tool_handler - async def list_snapshots(self, dataset: Optional[str] = None) -> Dict[str, Any]: + async def list_snapshots( + self, + dataset: Optional[str] = None, + limit: int = BaseTool.DEFAULT_LIMIT, + offset: int = 0 + ) -> Dict[str, Any]: """ List snapshots for a dataset or all snapshots - + Args: dataset: Optional dataset path to filter snapshots - + limit: Maximum number of items to return (default: 100, max: 500) + offset: Number of items to skip for pagination + Returns: Dictionary containing list of snapshots """ await self.ensure_initialized() - + # Get all snapshots params = {} if dataset: params["dataset"] = dataset - + snapshots = await self.client.get("/zfs/snapshot", params) - + snapshot_list = [] for snap in snapshots: # Parse snapshot name to extract dataset and snapshot name @@ -84,22 +100,26 @@ async def list_snapshots(self, dataset: Optional[str] = None) -> Dict[str, Any]: # Sort by creation time (newest first) snapshot_list.sort(key=lambda x: x.get("created", 0), reverse=True) - # Group by dataset + # Group by dataset (before pagination for accurate counts) by_dataset = {} for snap in snapshot_list: ds = snap["dataset"] if ds not in by_dataset: by_dataset[ds] = [] by_dataset[ds].append(snap) - + + # Apply pagination + paginated_snapshots, pagination = self.apply_pagination(snapshot_list, limit, offset) + return { "success": True, - "snapshots": snapshot_list, + "snapshots": paginated_snapshots, "metadata": { "total_snapshots": len(snapshot_list), "datasets_with_snapshots": len(by_dataset), "by_dataset": {ds: len(snaps) for ds, snaps in by_dataset.items()} - } + }, + "pagination": pagination } @tool_handler @@ -293,15 +313,23 @@ async def clone_snapshot(self, snapshot: str, target: str) -> Dict[str, Any]: } @tool_handler - async def list_snapshot_tasks(self) -> Dict[str, Any]: + async def list_snapshot_tasks( + self, + limit: int = BaseTool.DEFAULT_LIMIT, + offset: int = 0 + ) -> Dict[str, Any]: """ List automated snapshot tasks - + + Args: + limit: Maximum number of items to return (default: 100, max: 500) + offset: Number of items to skip for pagination + Returns: Dictionary containing list of snapshot tasks """ await self.ensure_initialized() - + tasks = await self.client.get("/pool/snapshottask") task_list = [] @@ -325,15 +353,24 @@ async def list_snapshot_tasks(self) -> Dict[str, Any]: "allow_empty": task.get("allow_empty", True) } task_list.append(task_info) - + + # Calculate counts before pagination + total_tasks = len(task_list) + enabled_tasks = sum(1 for t in task_list if t["enabled"]) + recursive_tasks = sum(1 for t in task_list if t["recursive"]) + + # Apply pagination + paginated_tasks, pagination = self.apply_pagination(task_list, limit, offset) + return { "success": True, - "tasks": task_list, + "tasks": paginated_tasks, "metadata": { - "total_tasks": len(task_list), - "enabled_tasks": sum(1 for t in task_list if t["enabled"]), - "recursive_tasks": sum(1 for t in task_list if t["recursive"]) - } + "total_tasks": total_tasks, + "enabled_tasks": enabled_tasks, + "recursive_tasks": recursive_tasks + }, + "pagination": pagination } @tool_handler diff --git a/truenas_mcp_server/tools/storage.py b/truenas_mcp_server/tools/storage.py index 58eef2d..0ca467c 100644 --- a/truenas_mcp_server/tools/storage.py +++ b/truenas_mcp_server/tools/storage.py @@ -12,12 +12,24 @@ class StorageTools(BaseTool): def get_tool_definitions(self) -> list: """Get tool definitions for storage management""" return [ - ("list_pools", self.list_pools, "List all storage pools", {}), + ("list_pools", self.list_pools, "List all storage pools", + {"limit": {"type": "integer", "required": False, + "description": "Max items to return (default: 100, max: 500)"}, + "offset": {"type": "integer", "required": False, + "description": "Items to skip for pagination"}}), ("get_pool_status", self.get_pool_status, "Get detailed status of a specific pool", {"pool_name": {"type": "string", "required": True}}), - ("list_datasets", self.list_datasets, "List all datasets", {}), + ("list_datasets", self.list_datasets, "List all datasets", + {"limit": {"type": "integer", "required": False, + "description": "Max items to return (default: 100, max: 500)"}, + "offset": {"type": "integer", "required": False, + "description": "Items to skip for pagination"}, + "include_children": {"type": "boolean", "required": False, + "description": "Include child datasets in response (default: true)"}}), ("get_dataset", self.get_dataset, "Get detailed information about a dataset", - {"dataset": {"type": "string", "required": True}}), + {"dataset": {"type": "string", "required": True}, + "include_children": {"type": "boolean", "required": False, + "description": "Include child datasets in response (default: true)"}}), ("create_dataset", self.create_dataset, "Create a new dataset", {"pool": {"type": "string", "required": True}, "name": {"type": "string", "required": True}, @@ -33,17 +45,21 @@ def get_tool_definitions(self) -> list: ] @tool_handler - async def list_pools(self) -> Dict[str, Any]: + async def list_pools(self, limit: int = 100, offset: int = 0) -> Dict[str, Any]: """ List all storage pools - + + Args: + limit: Maximum items to return (default: 100, max: 500) + offset: Number of items to skip for pagination + Returns: Dictionary containing list of pools with their status """ await self.ensure_initialized() - + pools = await self.client.get("/pool") - + pool_list = [] for pool in pools: # Calculate usage percentage @@ -51,7 +67,7 @@ async def list_pools(self) -> Dict[str, Any]: allocated = pool.get("allocated", 0) free = pool.get("free", 0) usage_percent = (allocated / size * 100) if size > 0 else 0 - + pool_info = { "name": pool.get("name"), "status": pool.get("status"), @@ -71,17 +87,20 @@ async def list_pools(self) -> Dict[str, Any]: } } pool_list.append(pool_info) - - # Calculate totals + + # Calculate totals (before pagination) total_size = sum(p.get("size", 0) for p in pools) total_allocated = sum(p.get("allocated", 0) for p in pools) total_free = sum(p.get("free", 0) for p in pools) - + + # Apply pagination + paginated_pools, pagination = self.apply_pagination(pool_list, limit, offset) + return { "success": True, - "pools": pool_list, + "pools": paginated_pools, + "pagination": pagination, "metadata": { - "pool_count": len(pool_list), "healthy_pools": sum(1 for p in pool_list if p["healthy"]), "degraded_pools": sum(1 for p in pool_list if not p["healthy"]), "total_capacity": self.format_size(total_size), @@ -184,23 +203,33 @@ async def get_pool_status(self, pool_name: str) -> Dict[str, Any]: } @tool_handler - async def list_datasets(self) -> Dict[str, Any]: + async def list_datasets( + self, + limit: int = 100, + offset: int = 0, + include_children: bool = True + ) -> Dict[str, Any]: """ List all datasets - + + Args: + limit: Maximum items to return (default: 100, max: 500) + offset: Number of items to skip for pagination + include_children: Include child datasets in response (default: true) + Returns: Dictionary containing list of datasets """ await self.ensure_initialized() - + datasets = await self.client.get("/pool/dataset") - + dataset_list = [] for ds in datasets: # Calculate usage used = ds.get("used", {}).get("parsed") if isinstance(ds.get("used"), dict) else ds.get("used", 0) available = ds.get("available", {}).get("parsed") if isinstance(ds.get("available"), dict) else ds.get("available", 0) - + dataset_info = { "name": ds.get("name"), "pool": ds.get("pool"), @@ -212,23 +241,29 @@ async def list_datasets(self) -> Dict[str, Any]: "used": self.format_size(used) if isinstance(used, (int, float)) else str(used), "available": self.format_size(available) if isinstance(available, (int, float)) else str(available), "quota": ds.get("quota", {}).get("value") if isinstance(ds.get("quota"), dict) else ds.get("quota"), - "children": ds.get("children", []) } + # Only include children if requested + if include_children: + dataset_info["children"] = ds.get("children", []) + dataset_list.append(dataset_info) - - # Organize by pool + + # Organize by pool (before pagination) pools_datasets = {} for ds in dataset_list: pool = ds["pool"] if pool not in pools_datasets: pools_datasets[pool] = [] pools_datasets[pool].append(ds) - + + # Apply pagination + paginated_datasets, pagination = self.apply_pagination(dataset_list, limit, offset) + return { "success": True, - "datasets": dataset_list, + "datasets": paginated_datasets, + "pagination": pagination, "metadata": { - "total_datasets": len(dataset_list), "by_pool": {pool: len(datasets) for pool, datasets in pools_datasets.items()}, "encrypted_datasets": sum(1 for ds in dataset_list if ds.get("encrypted")), "compressed_datasets": sum(1 for ds in dataset_list if ds.get("compression") and ds.get("compression") != "off") @@ -236,32 +271,33 @@ async def list_datasets(self) -> Dict[str, Any]: } @tool_handler - async def get_dataset(self, dataset: str) -> Dict[str, Any]: + async def get_dataset(self, dataset: str, include_children: bool = True) -> Dict[str, Any]: """ Get detailed information about a dataset - + Args: dataset: Dataset path (e.g., "tank/data") - + include_children: Include child datasets in response (default: true) + Returns: Dictionary containing dataset details """ await self.ensure_initialized() - + datasets = await self.client.get("/pool/dataset") - + target_dataset = None for ds in datasets: if ds.get("name") == dataset: target_dataset = ds break - + if not target_dataset: return { "success": False, "error": f"Dataset '{dataset}' not found" } - + # Extract all properties properties = {} for key in ["compression", "deduplication", "atime", "sync", "quota", "refquota", @@ -272,8 +308,8 @@ async def get_dataset(self, dataset: str) -> Dict[str, Any]: properties[key] = value.get("value") else: properties[key] = value - - return { + + result = { "success": True, "dataset": { "name": target_dataset.get("name"), @@ -293,11 +329,16 @@ async def get_dataset(self, dataset: str) -> Dict[str, Any]: "usedbychildren": target_dataset.get("usedbychildren", {}).get("value") if isinstance(target_dataset.get("usedbychildren"), dict) else target_dataset.get("usedbychildren") }, "properties": properties, - "children": target_dataset.get("children", []), "snapshot_count": target_dataset.get("snapshot_count", 0), "origin": target_dataset.get("origin", {}).get("value") if isinstance(target_dataset.get("origin"), dict) else target_dataset.get("origin") } } + + # Only include children if requested + if include_children: + result["dataset"]["children"] = target_dataset.get("children", []) + + return result @tool_handler async def create_dataset( diff --git a/truenas_mcp_server/tools/users.py b/truenas_mcp_server/tools/users.py index 5f3d857..1d70c1e 100644 --- a/truenas_mcp_server/tools/users.py +++ b/truenas_mcp_server/tools/users.py @@ -12,8 +12,12 @@ class UserTools(BaseTool): def get_tool_definitions(self) -> list: """Get tool definitions for user management""" return [ - ("list_users", self.list_users, "List all users in TrueNAS", {}), - ("get_user", self.get_user, "Get detailed information about a specific user", + ("list_users", self.list_users, "List all users in TrueNAS", + {"limit": {"type": "integer", "required": False, + "description": "Max items to return (default: 100, max: 500)"}, + "offset": {"type": "integer", "required": False, + "description": "Items to skip for pagination"}}), + ("get_user", self.get_user, "Get detailed information about a specific user", {"username": {"type": "string", "required": True}}), ("create_user", self.create_user, "Create a new user", {"username": {"type": "string", "required": True}, @@ -31,17 +35,25 @@ def get_tool_definitions(self) -> list: ] @tool_handler - async def list_users(self) -> Dict[str, Any]: + async def list_users( + self, + limit: int = BaseTool.DEFAULT_LIMIT, + offset: int = 0 + ) -> Dict[str, Any]: """ List all users in TrueNAS - + + Args: + limit: Maximum number of items to return (default: 100, max: 500) + offset: Number of items to skip for pagination + Returns: Dictionary containing list of users and metadata """ await self.ensure_initialized() - + users = await self.client.get("/user") - + # Filter and format user data user_list = [] for user in users: @@ -59,20 +71,26 @@ async def list_users(self) -> Dict[str, Any]: "builtin": user.get("builtin", False) } user_list.append(user_info) - - # Categorize users - system_users = [u for u in user_list if u["builtin"]] - regular_users = [u for u in user_list if not u["builtin"]] - + + # Categorize users (before pagination) + system_users = sum(1 for u in user_list if u["builtin"]) + regular_users = sum(1 for u in user_list if not u["builtin"]) + locked_users = sum(1 for u in user_list if u["locked"]) + total_count = len(user_list) + + # Apply pagination + paginated_users, pagination = self.apply_pagination(user_list, limit, offset) + return { "success": True, - "users": user_list, + "users": paginated_users, "metadata": { - "total_count": len(user_list), - "system_users": len(system_users), - "regular_users": len(regular_users), - "locked_users": sum(1 for u in user_list if u["locked"]) - } + "total_count": total_count, + "system_users": system_users, + "regular_users": regular_users, + "locked_users": locked_users + }, + "pagination": pagination } @tool_handler diff --git a/truenas_mcp_server/tools/vms.py b/truenas_mcp_server/tools/vms.py index 0ed754b..38ebcc3 100644 --- a/truenas_mcp_server/tools/vms.py +++ b/truenas_mcp_server/tools/vms.py @@ -25,11 +25,17 @@ def get_tool_definitions(self) -> list: """Get tool definitions for legacy VM management""" return [ ("list_legacy_vms", self.list_legacy_vms, - "List all legacy bhyve VMs", {}), + "List all legacy bhyve VMs", + {"limit": {"type": "integer", "required": False, + "description": "Max items to return (default: 100, max: 500)"}, + "offset": {"type": "integer", "required": False, + "description": "Items to skip for pagination"}}), ("get_legacy_vm", self.get_legacy_vm, "Get detailed information about a legacy VM", {"vm_id": {"type": "integer", "required": True, - "description": "Numeric ID of the VM"}}), + "description": "Numeric ID of the VM"}, + "include_raw": {"type": "boolean", "required": False, + "description": "Include full API response for debugging (default: false)"}}), ("start_legacy_vm", self.start_legacy_vm, "Start a legacy VM", {"vm_id": {"type": "integer", "required": True, @@ -63,10 +69,18 @@ def get_tool_definitions(self) -> list: ] @tool_handler - async def list_legacy_vms(self) -> Dict[str, Any]: + async def list_legacy_vms( + self, + limit: int = BaseTool.DEFAULT_LIMIT, + offset: int = 0 + ) -> Dict[str, Any]: """ List all legacy bhyve VMs + Args: + limit: Maximum number of items to return (default: 100, max: 500) + offset: Number of items to skip for pagination + Returns: Dictionary containing list of VMs with their status """ @@ -92,28 +106,39 @@ async def list_legacy_vms(self) -> Dict[str, Any]: } vm_list.append(vm_info) - # Count by status + # Count by status (before pagination) status_counts = {} for vm in vm_list: status = vm["status"] status_counts[status] = status_counts.get(status, 0) + 1 + total_vms = len(vm_list) + + # Apply pagination + paginated_vms, pagination = self.apply_pagination(vm_list, limit, offset) + return { "success": True, - "vms": vm_list, + "vms": paginated_vms, "metadata": { - "total_vms": len(vm_list), + "total_vms": total_vms, "status_counts": status_counts, - } + }, + "pagination": pagination } @tool_handler - async def get_legacy_vm(self, vm_id: int) -> Dict[str, Any]: + async def get_legacy_vm( + self, + vm_id: int, + include_raw: bool = False + ) -> Dict[str, Any]: """ Get detailed information about a legacy VM Args: vm_id: Numeric ID of the VM + include_raw: Include full API response for debugging (default: false) Returns: Dictionary containing VM details @@ -141,7 +166,7 @@ async def get_legacy_vm(self, vm_id: int) -> Dict[str, Any]: "attributes": device.get("attributes", {}), }) - return { + result = { "success": True, "vm": { "id": vm.get("id"), @@ -158,10 +183,14 @@ async def get_legacy_vm(self, vm_id: int) -> Dict[str, Any]: "cpu_model": vm.get("cpu_model"), "status": status, "devices": devices, - }, - "raw": vm # Include full response for debugging + } } + if include_raw: + result["raw"] = vm + + return result + @tool_handler async def start_legacy_vm(self, vm_id: int) -> Dict[str, Any]: """