diff --git a/docs/source/reference/package-apis/drivers/index.md b/docs/source/reference/package-apis/drivers/index.md index 2a5e173c..7db26160 100644 --- a/docs/source/reference/package-apis/drivers/index.md +++ b/docs/source/reference/package-apis/drivers/index.md @@ -60,6 +60,7 @@ Drivers that handle media streams: * **[UStreamer](ustreamer.md)** (`jumpstarter-driver-ustreamer`) - Video streaming functionality +* **[NanoKVM](nanokvm.md)** (`jumpstarter-driver-nanokvm`) - NanoKVM remote KVM control ### Debug and Programming Drivers @@ -95,6 +96,7 @@ flashers.md http.md http-power.md iscsi.md +nanokvm.md network.md opendal.md power.md diff --git a/docs/source/reference/package-apis/drivers/nanokvm.md b/docs/source/reference/package-apis/drivers/nanokvm.md new file mode 120000 index 00000000..dfb1e4b7 --- /dev/null +++ b/docs/source/reference/package-apis/drivers/nanokvm.md @@ -0,0 +1 @@ +../../../../../packages/jumpstarter-driver-nanokvm/README.md \ No newline at end of file diff --git a/packages/jumpstarter-driver-nanokvm/.gitignore b/packages/jumpstarter-driver-nanokvm/.gitignore new file mode 100644 index 00000000..cbc5d672 --- /dev/null +++ b/packages/jumpstarter-driver-nanokvm/.gitignore @@ -0,0 +1,3 @@ +__pycache__/ +.coverage +coverage.xml diff --git a/packages/jumpstarter-driver-nanokvm/README.md b/packages/jumpstarter-driver-nanokvm/README.md new file mode 100644 index 00000000..09f8eab6 --- /dev/null +++ b/packages/jumpstarter-driver-nanokvm/README.md @@ -0,0 +1,349 @@ +# NanoKVM Driver + +`jumpstarter-driver-nanokvm` provides comprehensive support for [NanoKVM](https://github.com/sipeed/NanoKVM) devices thanks to the amazing [python-nanokvm](https://github.com/puddly/python-nanokvm) library, enabling remote KVM (Keyboard, Video, Mouse) control over the network. + +## Features + +- **Video Streaming**: Access live video feed from the connected device +- **Snapshot Capture**: Take screenshots of the video stream +- **Keyboard Control**: Send text and keystrokes via HID emulation +- **Mouse Control**: Full mouse support via WebSocket + - Absolute positioning (0.0-1.0 normalized coordinates) + - Relative movement (0.0-1.0 normalized, where 1.0 = full screen) + - Left/right/middle button clicks + - Mouse wheel scrolling +- **Image Management**: Virtual disk and CD-ROM control + - Mount/unmount disk and CD-ROM images + - Download images from URLs + - Check mounted image status + - Monitor download progress +- **Device Management**: Get device info, reboot the NanoKVM +- **Composite Driver**: Access all functionality through a unified interface + +## Installation + +```{code-block} console +:substitutions: +$ pip3 install --extra-index-url {{index_url}} jumpstarter-driver-nanokvm +``` + +## Configuration + +### Basic Configuration + +```yaml +export: + nanokvm: + type: jumpstarter_driver_nanokvm.driver.NanoKVM + config: + host: "nanokvm.local" # Hostname or IP address + username: "admin" # Default NanoKVM web interface username + password: "admin" # Default NanoKVM web interface password +``` + +### Advanced Configuration + +```yaml +export: + nanokvm: + type: jumpstarter_driver_nanokvm.driver.NanoKVM + config: + host: "192.168.1.100" + username: "admin" + password: "your-password" + # Optional: SSH access for serial console (future feature) + enable_serial: false + ssh_username: "root" + ssh_password: "root" + ssh_port: 22 +``` + +### Config Parameters + +| Parameter | Description | Type | Required | Default | +| -------------- | ------------------------------------------ | ----- | -------- | ------- | +| host | NanoKVM hostname or IP address | str | yes | | +| username | Web interface username | str | no | "admin" | +| password | Web interface password | str | no | "admin" | +| enable_serial | Enable serial console access via SSH | bool | no | false | +| ssh_username | SSH username for serial console | str | no | "root" | +| ssh_password | SSH password for serial console | str | no | "root" | +| ssh_port | SSH port for serial console | int | no | 22 | + +## Architecture + +The NanoKVM driver is a composite driver that provides three main interfaces: + +1. **video**: Video streaming and snapshot capture +2. **hid**: Keyboard and mouse HID control +3. **serial**: Serial console access (optional, future feature) + +## API Reference + +### NanoKVMClient + +```{eval-rst} +.. autoclass:: jumpstarter_driver_nanokvm.client.NanoKVMClient() + :members: get_info, reboot, mount_image, download_image, get_mounted_image, get_cdrom_status, is_image_download_enabled, get_image_download_status +``` + +### NanoKVMVideoClient + +```{eval-rst} +.. autoclass:: jumpstarter_driver_nanokvm.client.NanoKVMVideoClient() + :members: snapshot +``` + +### NanoKVMHIDClient + +```{eval-rst} +.. autoclass:: jumpstarter_driver_nanokvm.client.NanoKVMHIDClient() + :members: paste_text, press_key, reset_hid, mouse_move_abs, mouse_move_rel, mouse_click, mouse_scroll +``` + +## CLI Usage + +The NanoKVM driver provides CLI commands accessible through the `jmp shell` command: + +### Main Commands + +```bash +# Get device information +j nanokvm info + +# Reboot the NanoKVM device (with confirmation) +j nanokvm reboot +``` + +### Video Commands + +```bash +# Take a snapshot (saves to snapshot.jpg by default) +j nanokvm video snapshot + +# Take a snapshot with custom filename +j nanokvm video snapshot my_screenshot.jpg +``` + +### HID Commands + +#### Keyboard Commands + +```bash +# Paste text via keyboard HID +j nanokvm hid paste "Hello, World!" + +# Send commands with newline (use $'...' syntax in bash for escape sequences) +j nanokvm hid paste $'root\n' + +# Or use double backslash +j nanokvm hid paste "root\\n" + +# Send multiple lines +j nanokvm hid paste $'ls -la\ndate\n' + +# Press a single key +j nanokvm hid press "a" + +# Press special keys +j nanokvm hid press $'\n' # Enter +j nanokvm hid press $'\t' # Tab + +# Reset HID subsystem if it's not responding +j nanokvm hid reset +``` + +#### Mouse Commands + +```bash +# Move mouse to absolute coordinates (0.0-1.0, where 0.0=top/left, 1.0=bottom/right) +j nanokvm hid mouse move 0.5 0.5 # Center of screen +j nanokvm hid mouse move 0.0 0.0 # Top-left corner +j nanokvm hid mouse move 1.0 1.0 # Bottom-right corner + +# Move mouse relatively (-1.0 to 1.0, where 1.0 = full screen width/height) +j nanokvm hid mouse move-rel 0.1 0.1 # Move right and down by 10% of screen +j nanokvm hid mouse move-rel -0.2 0.0 # Move left by 20% of screen width + +# Click at current position (default: left button) +j nanokvm hid mouse click + +# Click with specific button +j nanokvm hid mouse click --button right + +# Click at specific coordinates +j nanokvm hid mouse click --x 0.5 --y 0.5 --button left + +# Scroll (default: down 5 units) +j nanokvm hid mouse scroll + +# Scroll up +j nanokvm hid mouse scroll --dy 5 + +# Scroll down +j nanokvm hid mouse scroll --dy -5 +``` + +### Image Management Commands + +```bash +# Mount a disk image +j nanokvm image mount /data/myimage.img + +# Mount a CD-ROM image +j nanokvm image mount /data/installer.iso --cdrom + +# Unmount current image +j nanokvm image unmount + +# Check mounted image status +j nanokvm image status + +# Check if mounted image is in CD-ROM mode +j nanokvm image cdrom-status + +# Download an image from URL +j nanokvm image download https://example.com/image.iso + +# Check if image downloads are enabled +j nanokvm image download-enabled + +# Check download progress +j nanokvm image download-status +``` + +### Example Session + +```bash +# Connect to the exporter +jmp shell -l my=device + +# Inside the shell, use the commands +j nanokvm info +j nanokvm video snapshot my_screen.jpg + +# Mount a CD-ROM image +j nanokvm image mount /data/installer.iso --cdrom +j nanokvm image status + +# Control the mouse and keyboard +j nanokvm hid mouse move 0.5 0.5 +j nanokvm hid mouse click +j nanokvm hid paste "echo 'Hello from NanoKVM'\n" +``` + +## Usage Examples + +### Basic Setup + +```python +image = nanokvm.video.snapshot() +image.save("snapshot.jpg") +print(f"Snapshot size: {image.size}") +``` + +### Keyboard Control + +```python +# Paste text to the connected device +nanokvm.hid.paste_text("Hello from Jumpstarter!\n") + +# Send commands +nanokvm.hid.paste_text("ls -la\n") + +# Press individual keys +nanokvm.hid.press_key("a") +nanokvm.hid.press_key("\n") # Enter +nanokvm.hid.press_key("\t") # Tab +``` + +### Mouse Control + +```python +# Move mouse to center of screen (normalized 0.0-1.0 coordinates) +nanokvm.hid.mouse_move_abs(0.5, 0.5) + +# Move to top-left corner +nanokvm.hid.mouse_move_abs(0.0, 0.0) + +# Move to bottom-right corner +nanokvm.hid.mouse_move_abs(1.0, 1.0) + +# Click left button +nanokvm.hid.mouse_click("left") + +# Click at specific coordinates +nanokvm.hid.mouse_click("left", x=0.5, y=0.25) + +# Move mouse relatively (normalized -1.0 to 1.0, where 1.0 = full screen) +nanokvm.hid.mouse_move_rel(0.1, 0.1) # Move right/down by 10% of screen +nanokvm.hid.mouse_move_rel(-0.2, 0.0) # Move left by 20% of screen width + +# Scroll up +nanokvm.hid.mouse_scroll(0, 5) + +# Scroll down +nanokvm.hid.mouse_scroll(0, -5) +``` + +### Image Management + +```python +# Mount a disk image +nanokvm.mount_image("/data/myimage.img", cdrom=False) + +# Mount a CD-ROM image +nanokvm.mount_image("/data/installer.iso", cdrom=True) + +# Unmount current image +nanokvm.mount_image("") + +# Get mounted image info +file = nanokvm.get_mounted_image() +if file: + print(f"Mounted: {file}") + is_cdrom = nanokvm.get_cdrom_status() + print(f"Mode: {'CD-ROM' if is_cdrom else 'Disk'}") + +# Download an image +status = nanokvm.download_image("https://example.com/image.iso") +print(f"Download: {status['status']}, File: {status['file']}") + +# Check if downloads are enabled +if nanokvm.is_image_download_enabled(): + print("Downloads are available") + +# Monitor download progress +status = nanokvm.get_image_download_status() +print(f"Status: {status['status']}, Progress: {status['percentage']}") +``` + +### Device Management + +```python +# Get device info +info = nanokvm.get_info() +print(f"Device: {info['mdns']}") +print(f"IPs: {info['ips']}") +print(f"Application version: {info['application']}") + +# Reset HID +nanokvm.hid.reset_hid() +``` + + +## Character Support for paste_text() + +The `paste_text()` method supports a limited character set due to HID keyboard constraints: + +- Alphanumeric: `A-Z`, `a-z`, `0-9` +- Punctuation: `` `~!@#$%^&*()-_=+[]{}\|;:'",.<>/? `` +- Whitespace: Tab (`\t`), Newline (`\n`), Space +- Not supported: Extended Unicode, emoji, special control characters + + +## Related Documentation + +- [NanoKVM GitHub](https://github.com/sipeed/NanoKVM) +- [python-nanokvm Library](https://github.com/puddly/python-nanokvm) +- [Jumpstarter Documentation](https://jumpstarter.dev) diff --git a/packages/jumpstarter-driver-nanokvm/examples/exporter.yaml b/packages/jumpstarter-driver-nanokvm/examples/exporter.yaml new file mode 100644 index 00000000..f296fb3f --- /dev/null +++ b/packages/jumpstarter-driver-nanokvm/examples/exporter.yaml @@ -0,0 +1,19 @@ +apiVersion: jumpstarter.dev/v1alpha1 +kind: ExporterConfig +metadata: + namespace: default + name: demo +endpoint: grpc.jumpstarter.192.168.0.203.nip.io:8082 +token: "" +export: + nanokvm: + type: jumpstarter_driver_nanokvm.driver.NanoKVM + config: + host: "192.168.1.110" # or IP address like "192.168.1.100" + username: "admin" + password: "admin" + # Optional: Enable serial console access via SSH + # enable_serial: true + # ssh_username: "root" + # ssh_password: "root" + # ssh_port: 22 diff --git a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/__init__.py b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/__init__.py new file mode 100644 index 00000000..e37a7660 --- /dev/null +++ b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/__init__.py @@ -0,0 +1,13 @@ +"""NanoKVM driver for Jumpstarter + +This package provides support for NanoKVM devices, including: +- Video streaming and snapshot capture +- Keyboard and mouse HID control +- Serial console access (optional) +""" + +from .driver import NanoKVM, NanoKVMHID, NanoKVMVideo + +__all__ = ["NanoKVM", "NanoKVMVideo", "NanoKVMHID"] + + diff --git a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/client.py b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/client.py new file mode 100644 index 00000000..be603dad --- /dev/null +++ b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/client.py @@ -0,0 +1,537 @@ +import io +from base64 import b64decode +from dataclasses import dataclass + +import click +from jumpstarter_driver_composite.client import CompositeClient +from nanokvm.models import MouseButton +from PIL import Image + +from jumpstarter.client import DriverClient +from jumpstarter.client.decorators import driver_click_group + +# Re-export MouseButton for convenience +__all__ = ["NanoKVMVideoClient", "NanoKVMHIDClient", "NanoKVMClient", "MouseButton"] + + +@dataclass(kw_only=True) +class NanoKVMVideoClient(DriverClient): + """ + Client interface for NanoKVM video streaming + + This client provides methods to access video stream and snapshots + from the NanoKVM device. + """ + + def snapshot(self, skip_frames: int = 3) -> Image.Image: + """ + Get a snapshot image from the video stream + + Returns: + PIL Image object of the snapshot + """ + input_jpg_data = b64decode(self.call("snapshot", skip_frames)) + return Image.open(io.BytesIO(input_jpg_data)) + + def cli(self): + @driver_click_group(self) + def base(): + """NanoKVM Video commands""" + pass + + @base.command() + @click.argument("output", type=click.Path(), default="snapshot.jpg") + def snapshot(output): + """Take a snapshot and save to file""" + image = self.snapshot() + image.save(output) + click.echo(f"Snapshot saved to {output}") + click.echo(f"Image size: {image.size[0]}x{image.size[1]}") + + return base + + +@dataclass(kw_only=True) +class NanoKVMHIDClient(DriverClient): + """ + Client interface for NanoKVM HID (Keyboard/Mouse) control + + This client provides methods to send keyboard and mouse events + to the device connected to the NanoKVM. + """ + + def paste_text(self, text: str): + """ + Paste text via keyboard HID simulation + + Args: + text: Text to paste. Supports limited character set: + alphanumeric, basic punctuation, and common symbols + + Example:: + + hid.paste_text("Hello, World!") + hid.paste_text("ls -la\\n") + """ + self.call("paste_text", text) + + def press_key(self, key: str): + """ + Press a key by pasting a single character + + Args: + key: Single character or escape sequence to press + + Example:: + + hid.press_key("a") # Type 'a' + hid.press_key("A") # Type 'A' + hid.press_key("\\n") # Press Enter + hid.press_key("\\t") # Press Tab + """ + self.call("press_key", key) + + def reset_hid(self): + """ + Reset the HID subsystem + + This can help recover from HID communication issues. + """ + self.call("reset_hid") + + def mouse_move_abs(self, x: float, y: float): + """ + Move mouse to absolute coordinates + + Args: + x: X coordinate (0.0 to 1.0, where 0.0 is left and 1.0 is right) + y: Y coordinate (0.0 to 1.0, where 0.0 is top and 1.0 is bottom) + + Example:: + + # Move to center of screen + hid.mouse_move_abs(0.5, 0.5) + + # Move to top-left corner + hid.mouse_move_abs(0.0, 0.0) + + # Move to bottom-right corner + hid.mouse_move_abs(1.0, 1.0) + """ + self.call("mouse_move_abs", x, y) + + def mouse_move_rel(self, dx: float, dy: float): + """ + Move mouse relative to current position + + Args: + dx: X movement delta (-1.0 to 1.0, where 1.0 is full screen width) + dy: Y movement delta (-1.0 to 1.0, where 1.0 is full screen height) + + Example:: + + # Move right by 10% of screen width and down by 10% + hid.mouse_move_rel(0.1, 0.1) + + # Move left by 20% + hid.mouse_move_rel(-0.2, 0.0) + """ + self.call("mouse_move_rel", dx, dy) + + def mouse_click(self, button: MouseButton | str = "left", x: float | None = None, y: float | None = None): + """ + Click a mouse button + + Args: + button: Mouse button to click (MouseButton enum or "left", "right", "middle" string) + x: Optional X coordinate (0.0 to 1.0) for absolute positioning before click + y: Optional Y coordinate (0.0 to 1.0) for absolute positioning before click + + Example:: + + # Using string (backward compatible) + hid.mouse_click("left") + hid.mouse_click("left", 0.5, 0.5) + + # Using MouseButton enum (recommended) + hid.mouse_click(MouseButton.LEFT) + hid.mouse_click(MouseButton.RIGHT, 0.75, 0.25) + """ + if x is not None and y is not None: + self.call("mouse_click", button, x, y) + else: + self.call("mouse_click", button, None, None) + + def mouse_scroll(self, dx: int, dy: int): + """ + Scroll the mouse wheel + + Args: + dx: Horizontal scroll amount + dy: Vertical scroll amount (positive=up, negative=down) + + Example:: + + # Scroll up + hid.mouse_scroll(0, 5) + + # Scroll down + hid.mouse_scroll(0, -5) + """ + self.call("mouse_scroll", dx, dy) + + def cli(self): # noqa: C901 + @driver_click_group(self) + def base(): + """NanoKVM HID (Keyboard/Mouse) commands""" + pass + + @base.command() + @click.argument("text") + def paste(text): + """Paste text via keyboard HID (supports \\n for newline, \\t for tab)""" + # Decode escape sequences like \n, \t, etc. + decoded_text = text.encode().decode("unicode_escape") + self.paste_text(decoded_text) + click.echo(f"Pasted: {repr(decoded_text)}") + + @base.command() + @click.argument("key") + def press(key): + """Press a single key (supports \\n for Enter, \\t for Tab)""" + # Decode escape sequences + decoded_key = key.encode().decode("unicode_escape") + self.press_key(decoded_key) + click.echo(f"Pressed: {repr(decoded_key)}") + + @base.command() + def reset(): + """Reset the HID subsystem""" + self.reset_hid() + click.echo("HID subsystem reset") + + @base.group() + def mouse(): + """Mouse control commands""" + pass + + @mouse.command() + @click.argument("x", type=float) + @click.argument("y", type=float) + def move(x, y): + """Move mouse to absolute coordinates (0.0-1.0)""" + self.mouse_move_abs(x, y) + click.echo(f"Mouse moved to ({x}, {y})") + + @mouse.command() + @click.argument("dx", type=float) + @click.argument("dy", type=float) + def move_rel(dx, dy): + """Move mouse by relative offset (-1.0 to 1.0, where 1.0 is full screen)""" + self.mouse_move_rel(dx, dy) + click.echo(f"Mouse moved by ({dx}, {dy})") + + @mouse.command(name="click") + @click.option("--button", "-b", default="left", type=click.Choice(["left", "right", "middle"])) + @click.option("--x", type=float, default=None, help="Optional X coordinate (0.0-1.0)") + @click.option("--y", type=float, default=None, help="Optional Y coordinate (0.0-1.0)") + def mouse_click_cmd(button, x, y): + """Click a mouse button""" + # Convert string to MouseButton enum + button_map = { + "left": MouseButton.LEFT, + "right": MouseButton.RIGHT, + "middle": MouseButton.MIDDLE, + } + button_enum = button_map[button] + self.mouse_click(button_enum, x, y) + if x is not None and y is not None: + click.echo(f"Clicked {button} button at ({x}, {y})") + else: + click.echo(f"Clicked {button} button") + + @mouse.command() + @click.option("--dx", type=int, default=0, help="Horizontal scroll") + @click.option("--dy", type=int, default=-5, help="Vertical scroll") + def scroll(dx, dy): + """Scroll the mouse wheel""" + self.mouse_scroll(dx, dy) + click.echo(f"Scrolled ({dx}, {dy})") + + return base + + +@dataclass(kw_only=True) +class NanoKVMClient(CompositeClient): + """ + Client interface for NanoKVM devices + + This composite client provides access to all NanoKVM functionality: + - video: Video streaming and snapshots + - hid: Keyboard and mouse control + - serial: Serial console access (if enabled) + + Example:: + + # Get a snapshot + image = nanokvm.video.snapshot() + + # Paste text + nanokvm.hid.paste_text("Hello from Jumpstarter!") + + # Get device info + info = nanokvm.get_info() + print(f"Device: {info['mdns']}") + """ + + def get_info(self) -> dict: + """ + Get device information + + Returns: + Dictionary containing device information: + - ips: List of IP addresses + - mdns: mDNS hostname + - image: Image version + - application: Application version + - device_key: Device key + """ + return self.call("get_info") + + def reboot(self): + """ + Reboot the NanoKVM device + + Warning: + This will reboot the NanoKVM itself, not the connected device. + The connection will be lost during reboot. + """ + self.call("reboot") + + def mount_image(self, file: str = "", cdrom: bool = False): + """ + Mount an image file or unmount if file is empty string + + Args: + file: Path to image file on the NanoKVM device, or empty string to unmount + cdrom: Whether to mount as CD-ROM (True) or disk (False) + + Note: + Unmounting may fail if image is currently in use. If unmount fails, + you may need to power cycle the connected device first. + + Example:: + + # Mount a disk image + nanokvm.mount_image("/path/to/disk.img", cdrom=False) + + # Mount a CD-ROM image + nanokvm.mount_image("/path/to/cdrom.iso", cdrom=True) + + # Unmount + nanokvm.mount_image("") or nanokvm.mount_image() + """ + self.call("mount_image", file, cdrom) + + def download_image(self, url: str) -> dict: + """ + Start downloading an image from a URL + + Args: + url: URL of the image to download + + Returns: + Dictionary with download status, file, and percentage + + Example:: + + status = nanokvm.download_image("https://example.com/image.iso") + print(f"Download: {status['status']}, File: {status['file']}, {status['percentage']}%") + """ + return self.call("download_image", url) + + def get_mounted_image(self) -> str | None: + """ + Get information about mounted image + + Returns: + String with mounted image file path, or None if no image mounted + + Example:: + + file = nanokvm.get_mounted_image() + if file: + print(f"Mounted: {file}") + """ + return self.call("get_mounted_image") + + def get_cdrom_status(self) -> bool: + """ + Check if the mounted image is in CD-ROM mode + + Returns: + Boolean indicating if CD-ROM mode is active (True=CD-ROM, False=disk) + + Example:: + + if nanokvm.get_cdrom_status(): + print("CD-ROM mode is enabled") + """ + return self.call("get_cdrom_status") + + def is_image_download_enabled(self) -> bool: + """ + Check if the /data partition allows image downloads + + Returns: + Boolean indicating if image downloads are enabled + + Example:: + + if nanokvm.is_image_download_enabled(): + print("Image downloads are available") + """ + return self.call("is_image_download_enabled") + + def get_image_download_status(self) -> dict: + """ + Get the status of an ongoing image download + + Returns: + Dictionary with download status, file, and percentage complete + + Example:: + + status = nanokvm.get_image_download_status() + print(f"Status: {status['status']}, File: {status['file']}, {status['percentage']}%") + """ + return self.call("get_image_download_status") + + def get_images(self) -> list[str]: + """ + Get the list of available image files + + Returns: + List of image file paths available on the NanoKVM device + + Example:: + + images = nanokvm.get_images() + for image in images: + print(f"Available: {image}") + """ + return self.call("get_images") + + def cli(self): # noqa: C901 + """Create CLI interface with device management and child commands""" + base = super().cli() + + @base.command() + def info(): + """Get device information""" + info = self.get_info() + click.echo("NanoKVM Device Information:") + click.echo(f" mDNS: {info['mdns']}") + click.echo(f" Image version: {info['image']}") + click.echo(f" Application version: {info['application']}") + click.echo(f" Device key: {info['device_key']}") + if info['ips']: + click.echo(" IP Addresses:") + for ip in info['ips']: + click.echo(f" - {ip['name']}: {ip['addr']} ({ip['type']}, {ip['version']})") + + @base.command() + @click.confirmation_option(prompt="Are you sure you want to reboot the NanoKVM device?") + def reboot(): + """Reboot the NanoKVM device""" + self.reboot() + click.echo("NanoKVM device is rebooting...") + + @base.group() + def image(): + """Image management commands""" + pass + + @image.command() + @click.argument("file") + @click.option("--cdrom", is_flag=True, help="Mount as CD-ROM instead of disk") + def mount(file, cdrom): + """Mount an image file""" + self.mount_image(file, cdrom) + image_type = "CD-ROM" if cdrom else "disk" + click.echo(f"Mounted {file} as {image_type}") + + @image.command() + def unmount(): + """Unmount the currently mounted image + + Note: Unmount may fail if image is in use by the connected device. + Power cycle the device first if unmount fails. + """ + try: + self.mount_image("") + click.echo("Image unmounted successfully") + except Exception as e: + click.echo(f"Failed to unmount image: {e}", err=True) + click.echo("Note: Image may be in use. Try power cycling the connected device first.", err=True) + raise + + @image.command() + @click.argument("url") + def download(url): + """Download an image from URL""" + status = self.download_image(url) + click.echo(f"Download started: {status['status']}") + if status['file']: + click.echo(f"File: {status['file']}") + if status['percentage']: + click.echo(f"Progress: {status['percentage']}%") + + @image.command() + def status(): + """Show mounted image status""" + file = self.get_mounted_image() + if file: + is_cdrom = self.get_cdrom_status() + mode = "CD-ROM" if is_cdrom else "Disk" + click.echo(f"Mounted: {file}") + click.echo(f"Mode: {mode}") + else: + click.echo("No image mounted") + + @image.command() + def cdrom_status(): + """Check if mounted image is in CD-ROM mode""" + is_cdrom = self.get_cdrom_status() + mode = "CD-ROM" if is_cdrom else "Disk" + click.echo(f"Current mode: {mode}") + + @image.command() + def download_enabled(): + """Check if image downloads are enabled""" + enabled = self.is_image_download_enabled() + status = "enabled" if enabled else "disabled" + click.echo(f"Image downloads: {status}") + + @image.command() + def download_status(): + """Get current image download status""" + status = self.get_image_download_status() + click.echo(f"Status: {status['status']}") + if status['file']: + click.echo(f"File: {status['file']}") + if status['percentage']: + click.echo(f"Progress: {status['percentage']}") + + @image.command() + def list(): + """List available image files""" + images = self.get_images() + if images: + click.echo("Available images:") + for img in images: + click.echo(f" - {img}") + else: + click.echo("No images available") + + return base diff --git a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py new file mode 100644 index 00000000..c58c93bf --- /dev/null +++ b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py @@ -0,0 +1,594 @@ +import asyncio +from base64 import b64encode +from contextlib import asynccontextmanager +from dataclasses import dataclass, field +from functools import wraps +from io import BytesIO + +import anyio +from aiohttp import ClientResponseError +from jumpstarter_driver_composite.driver import Composite +from jumpstarter_driver_pyserial.driver import PySerial +from nanokvm.client import NanoKVMClient as NanoKVMAPIClient +from nanokvm.models import MouseButton + +from jumpstarter.driver import Driver, export, exportstream + +# Re-export MouseButton for convenience +__all__ = ["NanoKVMVideo", "NanoKVMHID", "NanoKVMSerial", "NanoKVM", "MouseButton"] + + +def _is_unauthorized_error(error: Exception) -> bool: + """Check if an error is a 401 Unauthorized error""" + if isinstance(error, ClientResponseError): + return error.status == 401 + # Also check for string representation in case error is wrapped + error_str = str(error) + return "401" in error_str and ("Unauthorized" in error_str or "unauthorized" in error_str.lower()) + + +def with_reauth(func): + """Decorator to automatically re-authenticate on 401 errors""" + @wraps(func) + async def wrapper(self, *args, **kwargs): + try: + return await func(self, *args, **kwargs) + except Exception as e: + if _is_unauthorized_error(e): + self.logger.warning("Received 401 Unauthorized, re-authenticating...") + await self._reset_client() + # Retry once after re-authentication + return await func(self, *args, **kwargs) + raise + return wrapper + + +@dataclass(kw_only=True) +class NanoKVMVideo(Driver): + """NanoKVM Video Streaming driver""" + + host: str + username: str = "admin" + password: str = "admin" + + _client: NanoKVMAPIClient | None = field(init=False, repr=False, default=None) + _client_ctx: object = field(init=False, repr=False, default=None) + + def __post_init__(self): + if hasattr(super(), "__post_init__"): + super().__post_init__() + + @classmethod + def client(cls) -> str: + return "jumpstarter_driver_nanokvm.client.NanoKVMVideoClient" + + async def _reset_client(self): + """Reset the client, forcing re-authentication""" + if self._client is not None: + try: + await self._client.close() + except Exception as e: + self.logger.debug(f"Error closing client during reset: {e}") + self._client = None + self._client_ctx = None + + async def _get_client(self) -> NanoKVMAPIClient: + """Get or create the NanoKVM API client using context manager""" + if self._client is None: + # Create a new client context manager + self._client_ctx = NanoKVMAPIClient(f"http://{self.host}/api/") + # Enter the context manager + self._client = await self._client_ctx.__aenter__() + # Authenticate + await self._client.authenticate(self.username, self.password) + return self._client + + def close(self): + """Clean up resources""" + # Schedule cleanup of client + if self._client_ctx is not None: + try: + anyio.from_thread.run(self._client_ctx.__aexit__(None, None, None)) + except Exception as e: + self.logger.debug(f"Error closing client: {e}") + + @export + @with_reauth + async def snapshot(self, skip_frames: int = 3) -> str: + """ + Take a snapshot from the video stream + + Returns: + Base64 encoded JPEG image data + """ + client = await self._get_client() + frame_count = 0 + async for frame in client.mjpeg_stream(): + frame_count += 1 + # Skip the first frames as it's normally stale + if frame_count < skip_frames: + continue + # Return the second (fresh) frame + buffer = BytesIO() + frame.save(buffer, format="JPEG") + data = buffer.getvalue() + self.logger.debug(f"snapshot: {len(data)} bytes") + return b64encode(data).decode("ascii") + raise RuntimeError("No frames available from video stream") + + @exportstream + @asynccontextmanager + async def stream(self): + """ + Stream video frames as JPEG images + + Yields a stream that provides JPEG image data + """ + self.logger.debug("Starting video stream") + client = await self._get_client() + + # Create a pair of connected streams + send_stream, receive_stream = anyio.create_memory_object_stream(max_buffer_size=10) + + async def stream_video(): + try: + async with send_stream: + async for frame in client.mjpeg_stream(): + buffer = BytesIO() + frame.save(buffer, format="JPEG") + data = buffer.getvalue() + # TODO(mangelajo): this needs to be tested + await send_stream.send(data) + except Exception as e: + if _is_unauthorized_error(e): + self.logger.warning("Received 401 Unauthorized during stream, re-authenticating...") + await self._reset_client() + # Retry with new client + new_client = await self._get_client() + async for frame in new_client.mjpeg_stream(): + buffer = BytesIO() + frame.save(buffer, format="JPEG") + data = buffer.getvalue() + await send_stream.send(data) + else: + self.logger.error(f"Error streaming video: {e}") + raise + + # Start the video streaming task + task = asyncio.create_task(stream_video()) + + try: + yield receive_stream + finally: + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + +@dataclass(kw_only=True) +class NanoKVMHID(Driver): + """NanoKVM HID (Keyboard/Mouse) driver""" + + host: str + username: str = "admin" + password: str = "admin" + + _client: NanoKVMAPIClient | None = field(init=False, repr=False, default=None) + _client_ctx: object = field(init=False, repr=False, default=None) + + def __post_init__(self): + if hasattr(super(), "__post_init__"): + super().__post_init__() + + @classmethod + def client(cls) -> str: + return "jumpstarter_driver_nanokvm.client.NanoKVMHIDClient" + + async def _reset_client(self): + """Reset the client, forcing re-authentication""" + if self._client is not None: + try: + await self._client.close() + except Exception as e: + self.logger.debug(f"Error closing client during reset: {e}") + self._client = None + self._client_ctx = None + + async def _get_client(self) -> NanoKVMAPIClient: + """Get or create the NanoKVM API client using context manager""" + if self._client is None: + # Create a new client context manager + self._client_ctx = NanoKVMAPIClient(f"http://{self.host}/api/") + # Enter the context manager + self._client = await self._client_ctx.__aenter__() + # Authenticate + await self._client.authenticate(self.username, self.password) + return self._client + + def close(self): + """Clean up resources""" + # Schedule cleanup of client + if self._client_ctx is not None: + try: + anyio.from_thread.run(self._client_ctx.__aexit__(None, None, None)) + except Exception as e: + self.logger.debug(f"Error closing client: {e}") + + @export + @with_reauth + async def paste_text(self, text: str): + """ + Paste text via keyboard HID simulation + + Args: + text: Text to paste (limited character set supported) + """ + client = await self._get_client() + await client.paste_text(text) + self.logger.info(f"Pasted text: {text}") + + @export + @with_reauth + async def press_key(self, key: str): + """ + Press a key by pasting a single character + + Args: + key: Single character or escape sequence to press (e.g., 'a', 'A', '\\n', '\\t') + + Note: + This uses paste_text under the hood, so it supports the same character set. + For special keys like Enter, use '\\n'. For Tab, use '\\t'. + """ + if len(key) > 2: # Allow for escape sequences like \n, \t + self.logger.warning(f"press_key should be used with single characters, got: {key}") + + client = await self._get_client() + await client.paste_text(key) + self.logger.debug(f"Pressed key: {repr(key)}") + + @export + @with_reauth + async def reset_hid(self): + """Reset the HID subsystem""" + client = await self._get_client() + await client.reset_hid() + self.logger.info("HID subsystem reset") + + @export + @with_reauth + async def mouse_move_abs(self, x: float, y: float): + """ + Move mouse to absolute coordinates + + Args: + x: X coordinate (0.0 to 1.0, where 0.0 is left/top and 1.0 is right/bottom) + y: Y coordinate (0.0 to 1.0, where 0.0 is left/top and 1.0 is right/bottom) + """ + client = await self._get_client() + await client.mouse_move_abs(x, y) + self.logger.debug(f"Mouse moved to absolute position: ({x}, {y})") + + @export + @with_reauth + async def mouse_move_rel(self, dx: float, dy: float): + """ + Move mouse relative to current position + + Args: + dx: X movement delta (-1.0 to 1.0, where 1.0 is full screen width) + dy: Y movement delta (-1.0 to 1.0, where 1.0 is full screen height) + """ + client = await self._get_client() + await client.mouse_move_rel(dx, dy) + self.logger.debug(f"Mouse moved by relative offset: ({dx}, {dy})") + + @export + @with_reauth + async def mouse_click(self, button: MouseButton | str = "left", x: float | None = None, y: float | None = None): + """ + Click a mouse button at current position or specified coordinates + + Args: + button: Mouse button to click (MouseButton enum or "left", "right", "middle" string) + x: Optional X coordinate (0.0 to 1.0) for absolute positioning before click + y: Optional Y coordinate (0.0 to 1.0) for absolute positioning before click + """ + # Convert string to MouseButton enum for backward compatibility + if isinstance(button, str): + button_map = { + "left": MouseButton.LEFT, + "right": MouseButton.RIGHT, + "middle": MouseButton.MIDDLE, + } + button = button_map.get(button.lower(), MouseButton.LEFT) + + client = await self._get_client() + await client.mouse_click(button, x, y) + self.logger.info(f"Mouse {button.name} clicked") + + @export + @with_reauth + async def mouse_scroll(self, dx: int, dy: int): + """ + Scroll the mouse wheel + + Args: + dx: Horizontal scroll amount + dy: Vertical scroll amount (positive=up, negative=down) + """ + client = await self._get_client() + await client.mouse_scroll(dx, dy) + self.logger.debug(f"Mouse scrolled: ({dx}, {dy})") + + +@dataclass(kw_only=True) +class NanoKVMSerial(PySerial): + """NanoKVM Serial console access via SSH tunnel""" + + nanokvm_host: str + nanokvm_username: str = "root" + nanokvm_password: str = "root" + nanokvm_ssh_port: int = 22 + + # PySerial will use the SSH tunnel + url: str = field(init=False) + + def __post_init__(self): + # Create an RFC2217 URL that will connect via SSH + # For now, we'll use a simple approach with a localhost tunnel + # This requires the user to set up SSH port forwarding manually + # or we can use paramiko to create the tunnel + self.url = "rfc2217://localhost:2217" + super().__post_init__() + + @classmethod + def client(cls) -> str: + return "jumpstarter_driver_pyserial.client.PySerialClient" + + +@dataclass(kw_only=True) +class NanoKVM(Composite): + """ + Composite driver for NanoKVM devices + + This driver provides: + - Video streaming via the 'video' child driver + - HID (Keyboard/Mouse) control via the 'hid' child driver + - Serial console access via SSH tunnel (optional) + """ + + host: str + username: str = "admin" + password: str = "admin" + + # SSH access for serial console (optional) + ssh_username: str = "root" + ssh_password: str = "root" + ssh_port: int = 22 + + # Optional: provide serial console access + enable_serial: bool = False + + def __post_init__(self): + # Create child drivers + self.children = { + "video": NanoKVMVideo( + host=self.host, + username=self.username, + password=self.password, + ), + "hid": NanoKVMHID( + host=self.host, + username=self.username, + password=self.password, + ), + } + + super().__post_init__() + + # Optionally add serial console access + if self.enable_serial: + # Note: This is a placeholder - actual serial console access via SSH + # would require additional implementation in the nanokvm library + self.logger.warning("Serial console access not yet fully implemented") + + + @classmethod + def client(cls) -> str: + return "jumpstarter_driver_nanokvm.client.NanoKVMClient" + + @export + async def get_info(self): + """Get device information""" + # Get info from the video driver's client + video_driver = self.children["video"] + + def _format_info(info): + """Format device info into a dictionary""" + return { + "ips": [ + {"name": ip.name, "addr": ip.addr, "version": ip.version, "type": ip.type} + for ip in info.ips + ], + "mdns": info.mdns, + "image": info.image, + "application": info.application, + "device_key": info.device_key, + } + + try: + client = await video_driver._get_client() + info = await client.get_info() + return _format_info(info) + except Exception as e: + if _is_unauthorized_error(e): + self.logger.warning("Received 401 Unauthorized, re-authenticating...") + await video_driver._reset_client() + # Retry once after re-authentication + client = await video_driver._get_client() + info = await client.get_info() + return _format_info(info) + raise + + @export + async def reboot(self): + """Reboot the NanoKVM device""" + video_driver = self.children["video"] + + @with_reauth + async def _reboot_impl(driver): + client = await driver._get_client() + await client.reboot_system() + + await _reboot_impl(video_driver) + self.logger.info("NanoKVM device rebooted") + + @export + async def mount_image(self, file: str = "", cdrom: bool = False): + """ + Mount an image file or unmount if file is empty string + + Args: + file: Path to image file on the NanoKVM device, or empty string to unmount + cdrom: Whether to mount as CD-ROM (True) or disk (False) + """ + video_driver = self.children["video"] + + @with_reauth + async def _mount_impl(driver): + client = await driver._get_client() + # Pass empty string or None for unmount - API expects empty string + mount_file = file if file else "" + # When unmounting, we need to pass the file as empty string or None + await client.mount_image(file=mount_file or None, cdrom=cdrom if mount_file else False) + + await _mount_impl(video_driver) + if file: + self.logger.info(f"Mounted image: {file} (cdrom={cdrom})") + else: + self.logger.info("Unmounted image") + + @export + async def download_image(self, url: str): + """ + Start downloading an image from a URL + + Args: + url: URL of the image to download + + Returns: + Dictionary with download status information + """ + video_driver = self.children["video"] + + @with_reauth + async def _download_impl(driver): + client = await driver._get_client() + status = await client.download_image(url=url) + return { + "status": status.status, + "file": status.file, + "percentage": status.percentage, + } + + result = await _download_impl(video_driver) + self.logger.info(f"Started download from {url}") + return result + + @export + async def get_mounted_image(self): + """ + Get information about mounted image + + Returns: + String with mounted image file path, or None if no image mounted + """ + video_driver = self.children["video"] + + @with_reauth + async def _get_mounted_impl(driver): + client = await driver._get_client() + info = await client.get_mounted_image() + return info.file + + return await _get_mounted_impl(video_driver) + + @export + async def get_cdrom_status(self): + """ + Check if the mounted image is in CD-ROM mode + + Returns: + Boolean indicating if CD-ROM mode is active (True=CD-ROM, False=disk) + """ + video_driver = self.children["video"] + + @with_reauth + async def _get_cdrom_status_impl(driver): + client = await driver._get_client() + status = await client.get_cdrom_status() + return bool(status.cdrom) + + return await _get_cdrom_status_impl(video_driver) + + @export + async def is_image_download_enabled(self): + """ + Check if the /data partition allows image downloads + + Returns: + Boolean indicating if image downloads are enabled + """ + video_driver = self.children["video"] + + @with_reauth + async def _is_download_enabled_impl(driver): + client = await driver._get_client() + status = await client.is_image_download_enabled() + return status.enabled + + return await _is_download_enabled_impl(video_driver) + + @export + async def get_image_download_status(self): + """ + Get the status of an ongoing image download + + Returns: + Dictionary with download status, file, and percentage complete + """ + video_driver = self.children["video"] + + @with_reauth + async def _get_download_status_impl(driver): + client = await driver._get_client() + status = await client.get_image_download_status() + return { + "status": status.status, + "file": status.file, + "percentage": status.percentage, + } + + return await _get_download_status_impl(video_driver) + + @export + async def get_images(self): + """ + Get the list of available image files + + Returns: + List of image file paths available on the NanoKVM device + """ + video_driver = self.children["video"] + + @with_reauth + async def _get_images_impl(driver): + client = await driver._get_client() + images = await client.get_images() + return images.files + + return await _get_images_impl(video_driver) + diff --git a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver_test.py b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver_test.py new file mode 100644 index 00000000..4ef863e4 --- /dev/null +++ b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver_test.py @@ -0,0 +1,227 @@ +"""Tests for NanoKVM driver""" + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from PIL import Image + +from .driver import NanoKVM, NanoKVMHID, NanoKVMVideo +from jumpstarter.common.utils import serve + + +@pytest.fixture +def mock_nanokvm_client(): + """Create a mock NanoKVM API client""" + with patch("jumpstarter_driver_nanokvm.driver.NanoKVMAPIClient") as mock_client_class: + mock_client = AsyncMock() + + # Mock authentication + mock_client.authenticate = AsyncMock() + mock_client.logout = AsyncMock() + mock_client.close = AsyncMock() + + # Mock info + mock_info = MagicMock() + mock_info.ips = [] + mock_info.mdns = "nanokvm-test.local" + mock_info.image = "1.0.0" + mock_info.application = "1.0.0" + mock_info.device_key = "test-key" + mock_client.get_info = AsyncMock(return_value=mock_info) + + # Mock video streaming + test_image = Image.new("RGB", (640, 480), color="red") + + async def mock_stream(): + # Yield several frames - first ones are buffered/old, later ones are fresh + yield test_image + yield test_image + yield test_image + yield test_image + + mock_client.mjpeg_stream = mock_stream + + # Mock HID functions + mock_client.paste_text = AsyncMock() + mock_client.reset_hid = AsyncMock() + mock_client.mouse_move_abs = AsyncMock() + mock_client.mouse_click = AsyncMock() + + # Mock reboot + mock_client.reboot_system = AsyncMock() + + # Mock image management + mock_images = MagicMock() + mock_images.files = ["/data/alpine-standard-3.23.2-x86_64.iso", "/data/cs10-js.iso"] + mock_client.get_images = AsyncMock(return_value=mock_images) + + # Mock context manager behavior + mock_context = AsyncMock() + mock_context.__aenter__ = AsyncMock(return_value=mock_client) + mock_context.__aexit__ = AsyncMock(return_value=None) + mock_client_class.return_value = mock_context + + yield mock_client + + +@pytest.fixture +def mock_aiohttp_session(): + """Create a mock aiohttp ClientSession""" + with patch("aiohttp.ClientSession") as mock_session_class: + mock_session = AsyncMock() + mock_session.close = AsyncMock() + mock_session_class.return_value = mock_session + yield mock_session + + +def test_nanokvm_video_snapshot(mock_nanokvm_client, mock_aiohttp_session): + """Test video snapshot functionality""" + video = NanoKVMVideo(host="test.local", username="admin", password="admin") + + with serve(video) as client: + # Get a snapshot + image = client.snapshot() + + # Verify it's a PIL Image + assert isinstance(image, Image.Image) + assert image.size == (640, 480) + + +def test_nanokvm_hid_paste(mock_nanokvm_client, mock_aiohttp_session): + """Test HID paste text functionality""" + hid = NanoKVMHID(host="test.local", username="admin", password="admin") + + with serve(hid) as client: + # Paste some text + client.paste_text("Hello, World!") + + # Verify the mock was called + mock_nanokvm_client.paste_text.assert_called_once_with("Hello, World!") + + +def test_nanokvm_hid_reset(mock_nanokvm_client, mock_aiohttp_session): + """Test HID reset functionality""" + hid = NanoKVMHID(host="test.local", username="admin", password="admin") + + with serve(hid) as client: + # Reset HID + client.reset_hid() + + # Verify the mock was called + mock_nanokvm_client.reset_hid.assert_called_once() + + +def test_nanokvm_hid_press_key(mock_nanokvm_client, mock_aiohttp_session): + """Test key press functionality""" + hid = NanoKVMHID(host="test.local", username="admin", password="admin") + + with serve(hid) as client: + # Press a key + client.press_key("a") + + # Verify paste_text was called with the character + mock_nanokvm_client.paste_text.assert_called_with("a") + + +def test_nanokvm_composite(mock_nanokvm_client, mock_aiohttp_session): + """Test composite NanoKVM driver""" + driver = NanoKVM( + host="test.local", + username="admin", + password="admin", + ) + + with serve(driver) as client: + # Test that children are accessible + assert hasattr(client, "video") + assert hasattr(client, "hid") + + # Test video snapshot through composite + image = client.video.snapshot() + assert isinstance(image, Image.Image) + + # Test HID paste through composite + client.hid.paste_text("Test") + mock_nanokvm_client.paste_text.assert_called_with("Test") + + # Test get_info + info = client.get_info() + assert "mdns" in info + assert info["mdns"] == "nanokvm-test.local" + + +def test_nanokvm_reboot(mock_nanokvm_client, mock_aiohttp_session): + """Test NanoKVM reboot functionality""" + driver = NanoKVM( + host="test.local", + username="admin", + password="admin", + ) + + with serve(driver) as client: + # Test reboot + client.reboot() + mock_nanokvm_client.reboot_system.assert_called_once() + + +def test_nanokvm_video_client_creation(): + """Test that NanoKVMVideo returns correct client class""" + assert NanoKVMVideo.client() == "jumpstarter_driver_nanokvm.client.NanoKVMVideoClient" + + +def test_nanokvm_hid_client_creation(): + """Test that NanoKVMHID returns correct client class""" + assert NanoKVMHID.client() == "jumpstarter_driver_nanokvm.client.NanoKVMHIDClient" + + +def test_nanokvm_client_creation(): + """Test that NanoKVM returns correct client class""" + assert NanoKVM.client() == "jumpstarter_driver_nanokvm.client.NanoKVMClient" + + +def test_nanokvm_mouse_move_abs(mock_nanokvm_client, mock_aiohttp_session): + """Test mouse absolute movement""" + hid = NanoKVMHID(host="test.local", username="admin", password="admin") + + with serve(hid) as client: + # Move mouse to absolute position (normalized 0.0-1.0 coordinates) + client.mouse_move_abs(0.5, 0.5) + + # Verify the mock was called + mock_nanokvm_client.mouse_move_abs.assert_called_once_with(0.5, 0.5) + + +def test_nanokvm_mouse_click(mock_nanokvm_client, mock_aiohttp_session): + """Test mouse click""" + from nanokvm.models import MouseButton + + hid = NanoKVMHID(host="test.local", username="admin", password="admin") + + with serve(hid) as client: + # Click left button + client.mouse_click("left") + + # Verify the mock was called + mock_nanokvm_client.mouse_click.assert_called_once_with(MouseButton.LEFT, None, None) + + +def test_nanokvm_get_images(mock_nanokvm_client, mock_aiohttp_session): + """Test getting list of available images""" + driver = NanoKVM( + host="test.local", + username="admin", + password="admin", + ) + + with serve(driver) as client: + # Get list of images + images = client.get_images() + + # Verify the result + assert isinstance(images, list) + assert len(images) == 2 + assert "/data/alpine-standard-3.23.2-x86_64.iso" in images + assert "/data/cs10-js.iso" in images + + # Verify the mock was called + mock_nanokvm_client.get_images.assert_called_once() diff --git a/packages/jumpstarter-driver-nanokvm/pyproject.toml b/packages/jumpstarter-driver-nanokvm/pyproject.toml new file mode 100644 index 00000000..75069323 --- /dev/null +++ b/packages/jumpstarter-driver-nanokvm/pyproject.toml @@ -0,0 +1,56 @@ +[project] +name = "jumpstarter-driver-nanokvm" +dynamic = ["version", "urls"] +description = "Jumpstarter driver for NanoKVM devices providing video streaming, keyboard/mouse control, and serial console access" +readme = "README.md" +license = "Apache-2.0" +authors = [ + { name = "Miguel Angel Ajo", email = "miguelangel@ajo.es" } +] +requires-python = ">=3.11" +dependencies = [ + "anyio>=4.10.0", + "jumpstarter", + "jumpstarter-driver-composite", + "jumpstarter-driver-pyserial", + "nanokvm @ git+https://github.com/mangelajo/python-nanokvm.git@dev", + "aiohttp", + "pillow", + "pydantic", + "yarl", + "click", +] + +[project.entry-points."jumpstarter.drivers"] +NanoKVM = "jumpstarter_driver_nanokvm.driver:NanoKVM" + +[tool.hatch.version] +source = "vcs" +raw-options = { 'root' = '../../'} + +[tool.hatch.metadata] +allow-direct-references = true + +[tool.hatch.metadata.hooks.vcs.urls] +Homepage = "https://jumpstarter.dev" +source_archive = "https://github.com/jumpstarter-dev/repo/archive/{commit_hash}.zip" + +[tool.pytest.ini_options] +addopts = "--cov --cov-report=html --cov-report=xml" +log_cli = true +log_cli_level = "INFO" +testpaths = ["jumpstarter_driver_nanokvm"] +asyncio_mode = "auto" + +[build-system] +requires = ["hatchling", "hatch-vcs", "hatch-pin-jumpstarter"] +build-backend = "hatchling.build" + +[tool.hatch.build.hooks.pin_jumpstarter] +name = "pin_jumpstarter" + +[dependency-groups] +dev = [ + "pytest-cov>=6.0.0", + "pytest>=8.3.3", +] diff --git a/pyproject.toml b/pyproject.toml index e6a171ec..4a360acc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,7 @@ jumpstarter-driver-http-power = { workspace = true } jumpstarter-driver-gpiod = { workspace = true } jumpstarter-driver-ridesx = { workspace = true } jumpstarter-driver-network = { workspace = true } +jumpstarter-driver-nanokvm = { workspace = true } jumpstarter-driver-opendal = { workspace = true } jumpstarter-driver-power = { workspace = true } jumpstarter-driver-probe-rs = { workspace = true } diff --git a/uv.lock b/uv.lock index 77b11a50..8d16656e 100644 --- a/uv.lock +++ b/uv.lock @@ -22,6 +22,7 @@ members = [ "jumpstarter-driver-http", "jumpstarter-driver-http-power", "jumpstarter-driver-iscsi", + "jumpstarter-driver-nanokvm", "jumpstarter-driver-network", "jumpstarter-driver-opendal", "jumpstarter-driver-power", @@ -1778,6 +1779,48 @@ dev = [ { name = "pytest-cov", specifier = ">=6.0.0" }, ] +[[package]] +name = "jumpstarter-driver-nanokvm" +source = { editable = "packages/jumpstarter-driver-nanokvm" } +dependencies = [ + { name = "aiohttp" }, + { name = "anyio" }, + { name = "click" }, + { name = "jumpstarter" }, + { name = "jumpstarter-driver-composite" }, + { name = "jumpstarter-driver-pyserial" }, + { name = "nanokvm" }, + { name = "pillow" }, + { name = "pydantic" }, + { name = "yarl" }, +] + +[package.dev-dependencies] +dev = [ + { name = "pytest" }, + { name = "pytest-cov" }, +] + +[package.metadata] +requires-dist = [ + { name = "aiohttp" }, + { name = "anyio", specifier = ">=4.10.0" }, + { name = "click" }, + { name = "jumpstarter", editable = "packages/jumpstarter" }, + { name = "jumpstarter-driver-composite", editable = "packages/jumpstarter-driver-composite" }, + { name = "jumpstarter-driver-pyserial", editable = "packages/jumpstarter-driver-pyserial" }, + { name = "nanokvm", git = "https://github.com/mangelajo/python-nanokvm.git?rev=dev" }, + { name = "pillow" }, + { name = "pydantic" }, + { name = "yarl" }, +] + +[package.metadata.requires-dev] +dev = [ + { name = "pytest", specifier = ">=8.3.3" }, + { name = "pytest-cov", specifier = ">=6.0.0" }, +] + [[package]] name = "jumpstarter-driver-network" source = { editable = "packages/jumpstarter-driver-network" } @@ -2817,6 +2860,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/5f/df/76d0321c3797b54b60fef9ec3bd6f4cfd124b9e422182156a1dd418722cf/myst_parser-4.0.1-py3-none-any.whl", hash = "sha256:9134e88959ec3b5780aedf8a99680ea242869d012e8821db3126d427edc9c95d", size = 84579, upload-time = "2025-02-12T10:53:02.078Z" }, ] +[[package]] +name = "nanokvm" +version = "0.0.1" +source = { git = "https://github.com/mangelajo/python-nanokvm.git?rev=dev#7cbf92f3ebe2d100a25d07b383ac177f745fa668" } +dependencies = [ + { name = "aiohttp" }, + { name = "cryptography" }, + { name = "paramiko" }, + { name = "pillow" }, + { name = "pydantic" }, + { name = "yarl" }, +] + [[package]] name = "nodeenv" version = "1.9.1"