diff --git a/openhands-sdk/openhands/sdk/plugin/__init__.py b/openhands-sdk/openhands/sdk/plugin/__init__.py index cf6d6db6e1..5bfb25a026 100644 --- a/openhands-sdk/openhands/sdk/plugin/__init__.py +++ b/openhands-sdk/openhands/sdk/plugin/__init__.py @@ -4,6 +4,8 @@ skills, hooks, MCP configurations, agents, and commands together. """ +from openhands.sdk.plugin.fetch import PluginFetchError, parse_plugin_source +from openhands.sdk.plugin.git_helper import GitError, GitHelper from openhands.sdk.plugin.plugin import Plugin from openhands.sdk.plugin.types import ( AgentDefinition, @@ -15,8 +17,12 @@ __all__ = [ "Plugin", + "PluginFetchError", "PluginManifest", "PluginAuthor", "AgentDefinition", "CommandDefinition", + "parse_plugin_source", + "GitHelper", + "GitError", ] diff --git a/openhands-sdk/openhands/sdk/plugin/fetch.py b/openhands-sdk/openhands/sdk/plugin/fetch.py new file mode 100644 index 0000000000..18ab913e71 --- /dev/null +++ b/openhands-sdk/openhands/sdk/plugin/fetch.py @@ -0,0 +1,349 @@ +"""Plugin fetching utilities for remote plugin sources.""" + +from __future__ import annotations + +import hashlib +import re +import shutil +from pathlib import Path + +from openhands.sdk.logger import get_logger +from openhands.sdk.plugin.git_helper import GitError, GitHelper + + +logger = get_logger(__name__) + +DEFAULT_CACHE_DIR = Path.home() / ".openhands" / "cache" / "plugins" + +# Default GitHelper instance - can be replaced for testing +_default_git_helper: GitHelper | None = None + + +def get_git_helper() -> GitHelper: + """Get the default GitHelper instance.""" + global _default_git_helper + if _default_git_helper is None: + _default_git_helper = GitHelper() + return _default_git_helper + + +def set_git_helper(helper: GitHelper | None) -> None: + """Set the default GitHelper instance (for testing).""" + global _default_git_helper + _default_git_helper = helper + + +class PluginFetchError(Exception): + """Raised when fetching a plugin fails.""" + + pass + + +def parse_plugin_source(source: str) -> tuple[str, str]: + """Parse plugin source into (type, url). + + Args: + source: Plugin source string. Can be: + - "github:owner/repo" - GitHub repository shorthand + - "https://github.com/owner/repo.git" - Full git URL + - "git@github.com:owner/repo.git" - SSH git URL + - "/local/path" - Local path + + Returns: + Tuple of (source_type, normalized_url) where source_type is one of: + - "github": GitHub repository + - "git": Any git URL + - "local": Local filesystem path + + Examples: + >>> parse_plugin_source("github:owner/repo") + ("github", "https://github.com/owner/repo.git") + >>> parse_plugin_source("https://gitlab.com/org/repo.git") + ("git", "https://gitlab.com/org/repo.git") + >>> parse_plugin_source("/local/path") + ("local", "/local/path") + """ + source = source.strip() + + # GitHub shorthand: github:owner/repo + if source.startswith("github:"): + repo_path = source[7:] # Remove "github:" prefix + # Validate format + if "/" not in repo_path or repo_path.count("/") > 1: + raise PluginFetchError( + f"Invalid GitHub shorthand format: {source}. " + f"Expected format: github:owner/repo" + ) + url = f"https://github.com/{repo_path}.git" + return ("github", url) + + # Git URL patterns + git_url_patterns = [ + r"^https?://.*\.git$", # HTTPS with .git suffix + r"^https?://github\.com/", # GitHub HTTPS (may not have .git) + r"^https?://gitlab\.com/", # GitLab HTTPS + r"^https?://bitbucket\.org/", # Bitbucket HTTPS + r"^git@.*:.*\.git$", # SSH format + r"^git://", # Git protocol + r"^file://", # Local file:// URLs (for testing) + ] + + for pattern in git_url_patterns: + if re.match(pattern, source): + # Normalize: ensure .git suffix for HTTPS URLs + url = source + if url.startswith("https://") and not url.endswith(".git"): + # Remove trailing slash if present + url = url.rstrip("/") + url = f"{url}.git" + return ("git", url) + + # Local path + if source.startswith("/") or source.startswith("~") or source.startswith("."): + return ("local", source) + + # If it looks like a path (contains path separators but no URL scheme) + if "/" in source and "://" not in source and not source.startswith("github:"): + # Could be a relative path + return ("local", source) + + raise PluginFetchError( + f"Unable to parse plugin source: {source}. " + f"Expected formats: 'github:owner/repo', git URL, or local path" + ) + + +def get_cache_path(source: str, cache_dir: Path | None = None) -> Path: + """Get the cache path for a plugin source. + + Creates a deterministic path based on a hash of the source URL. + + Args: + source: The plugin source (URL or path). + cache_dir: Base cache directory. Defaults to ~/.openhands/cache/plugins/ + + Returns: + Path where the plugin should be cached. + """ + if cache_dir is None: + cache_dir = DEFAULT_CACHE_DIR + + # Create a hash of the source for the directory name + source_hash = hashlib.sha256(source.encode()).hexdigest()[:16] + + # Also include a readable portion of the source + # Extract repo name from various formats + readable_name = _extract_readable_name(source) + + cache_name = f"{readable_name}-{source_hash}" + return cache_dir / cache_name + + +def _extract_readable_name(source: str) -> str: + """Extract a human-readable name from a source URL/path. + + Args: + source: Plugin source string. + + Returns: + A sanitized, readable name for the cache directory. + """ + # Remove common prefixes and suffixes + name = source + + # Handle github: prefix + if name.startswith("github:"): + name = name[7:] + + # Handle URLs + if "://" in name: + # Remove protocol + name = name.split("://", 1)[1] + + # Handle SSH format (git@github.com:owner/repo.git) + if name.startswith("git@"): + name = name.split(":", 1)[1] if ":" in name else name + + # Remove .git suffix + if name.endswith(".git"): + name = name[:-4] + + # Get the last component (repo name) + if "/" in name: + parts = name.rstrip("/").split("/") + # For owner/repo format, use repo name + name = parts[-1] if parts else name + + # Sanitize: only allow alphanumeric, dash, underscore + name = re.sub(r"[^a-zA-Z0-9_-]", "-", name) + name = re.sub(r"-+", "-", name) # Collapse multiple dashes + name = name.strip("-") + + # Limit length + return name[:32] if name else "plugin" + + +def fetch_plugin( + source: str, + cache_dir: Path | None = None, + ref: str | None = None, + update: bool = True, + subpath: str | None = None, + git_helper: GitHelper | None = None, +) -> Path: + """Fetch a plugin from a remote source and return the local cached path. + + Args: + source: Plugin source - can be: + - "github:owner/repo" - GitHub repository shorthand + - "https://github.com/owner/repo.git" - Full git URL + - "/local/path" - Local path (returned as-is) + cache_dir: Directory for caching. Defaults to ~/.openhands/cache/plugins/ + ref: Optional branch, tag, or commit to checkout. + update: If True and cache exists, update it. If False, use cached version as-is. + subpath: Optional subdirectory path within the repo. If specified, the returned + path will point to this subdirectory instead of the repository root. + git_helper: GitHelper instance (for testing). Defaults to global instance. + + Returns: + Path to the local plugin directory (ready for Plugin.load()). + If subpath is specified, returns the path to that subdirectory. + + Raises: + PluginFetchError: If fetching fails or subpath doesn't exist. + """ + source_type, url = parse_plugin_source(source) + + # Local paths are returned as-is + if source_type == "local": + local_path = Path(url).expanduser().resolve() + if not local_path.exists(): + raise PluginFetchError(f"Local plugin path does not exist: {local_path}") + # Apply subpath for local paths too + if subpath: + final_path = local_path / subpath.strip("/") + if not final_path.exists(): + raise PluginFetchError( + f"Subdirectory '{subpath}' not found in local plugin path" + ) + return final_path + return local_path + + # Get git helper + git = git_helper or get_git_helper() + + # Get cache path + if cache_dir is None: + cache_dir = DEFAULT_CACHE_DIR + + plugin_path = get_cache_path(url, cache_dir) + + # Ensure cache directory exists + cache_dir.mkdir(parents=True, exist_ok=True) + + try: + if plugin_path.exists() and (plugin_path / ".git").exists(): + if update: + _update_repository(plugin_path, ref, git) + else: + logger.debug(f"Using cached plugin at {plugin_path}") + if ref: + _checkout_ref(plugin_path, ref, git) + else: + _clone_repository(url, plugin_path, ref, git) + + # Apply subpath if specified + if subpath: + final_path = plugin_path / subpath.strip("/") + if not final_path.exists(): + raise PluginFetchError( + f"Subdirectory '{subpath}' not found in plugin repository" + ) + return final_path + + return plugin_path + + except GitError as e: + raise PluginFetchError(f"Git operation failed: {e}") from e + except PluginFetchError: + raise + except Exception as e: + raise PluginFetchError(f"Failed to fetch plugin from {source}: {e}") from e + + +def _clone_repository(url: str, dest: Path, ref: str | None, git: GitHelper) -> None: + """Clone a git repository. + + Args: + url: Git URL to clone. + dest: Destination path. + ref: Optional branch/tag to checkout. + git: GitHelper instance. + """ + logger.info(f"Cloning plugin from {url}") + + # Remove existing directory if it exists (but isn't a valid git repo) + if dest.exists(): + shutil.rmtree(dest) + + git.clone(url, dest, depth=1, branch=ref) + + logger.debug(f"Plugin cloned to {dest}") + + +def _update_repository(repo_path: Path, ref: str | None, git: GitHelper) -> None: + """Update an existing repository. + + Args: + repo_path: Path to the repository. + ref: Optional branch/tag to checkout. + git: GitHelper instance. + """ + logger.debug(f"Updating plugin repository at {repo_path}") + + try: + # Fetch latest changes + git.fetch(repo_path) + + if ref: + _checkout_ref(repo_path, ref, git) + else: + # Get the current branch + current_branch = git.get_current_branch(repo_path) + + if current_branch: + # Reset to latest on current branch + git.reset_hard(repo_path, f"origin/{current_branch}") + + logger.debug("Plugin repository updated successfully") + + except GitError as e: + logger.warning( + f"Failed to update repository: {e}, using existing cached version" + ) + + +def _checkout_ref(repo_path: Path, ref: str, git: GitHelper) -> None: + """Checkout a specific ref (branch, tag, or commit). + + Args: + repo_path: Path to the repository. + ref: Branch, tag, or commit to checkout. + git: GitHelper instance. + """ + logger.debug(f"Checking out ref: {ref}") + + # First try to fetch the ref + try: + git.fetch(repo_path, ref=ref) + except GitError: + pass # May fail for commits, that's ok + + # Checkout the ref + git.checkout(repo_path, ref) + + # If it's a branch, reset to origin + try: + git.reset_hard(repo_path, f"origin/{ref}") + except GitError: + pass # May fail for tags/commits, that's ok diff --git a/openhands-sdk/openhands/sdk/plugin/git_helper.py b/openhands-sdk/openhands/sdk/plugin/git_helper.py new file mode 100644 index 0000000000..c36f18265a --- /dev/null +++ b/openhands-sdk/openhands/sdk/plugin/git_helper.py @@ -0,0 +1,196 @@ +"""Git operations helper for plugin fetching.""" + +from __future__ import annotations + +import subprocess +from pathlib import Path + +from openhands.sdk.logger import get_logger + + +logger = get_logger(__name__) + + +class GitError(Exception): + """Raised when a git operation fails.""" + + pass + + +class GitHelper: + """Abstraction for git operations, enabling easy mocking in tests.""" + + def clone( + self, + url: str, + dest: Path, + depth: int | None = 1, + branch: str | None = None, + timeout: int = 120, + ) -> None: + """Clone a git repository. + + Args: + url: Git URL to clone. + dest: Destination path. + depth: Clone depth (None for full clone). + branch: Branch/tag to checkout. + timeout: Timeout in seconds. + + Raises: + GitError: If clone fails. + """ + cmd = ["git", "clone"] + + if depth is not None: + cmd.extend(["--depth", str(depth)]) + + if branch: + cmd.extend(["--branch", branch]) + + cmd.extend([url, str(dest)]) + + logger.debug(f"Running: {' '.join(cmd)}") + + try: + subprocess.run( + cmd, + check=True, + capture_output=True, + timeout=timeout, + ) + except subprocess.CalledProcessError as e: + stderr = e.stderr.decode() if e.stderr else str(e) + raise GitError(f"Clone failed: {stderr}") from e + except subprocess.TimeoutExpired as e: + raise GitError(f"Clone timed out after {timeout}s") from e + + def fetch( + self, + repo_path: Path, + remote: str = "origin", + ref: str | None = None, + timeout: int = 60, + ) -> None: + """Fetch from remote. + + Args: + repo_path: Path to the repository. + remote: Remote name. + ref: Specific ref to fetch (optional). + timeout: Timeout in seconds. + + Raises: + GitError: If fetch fails. + """ + cmd = ["git", "fetch", remote] + if ref: + cmd.append(ref) + + logger.debug(f"Running: {' '.join(cmd)} in {repo_path}") + + try: + subprocess.run( + cmd, + cwd=repo_path, + check=True, + capture_output=True, + timeout=timeout, + ) + except subprocess.CalledProcessError as e: + stderr = e.stderr.decode() if e.stderr else str(e) + raise GitError(f"Fetch failed: {stderr}") from e + except subprocess.TimeoutExpired as e: + raise GitError(f"Fetch timed out after {timeout}s") from e + + def checkout(self, repo_path: Path, ref: str, timeout: int = 30) -> None: + """Checkout a ref (branch, tag, or commit). + + Args: + repo_path: Path to the repository. + ref: Branch, tag, or commit to checkout. + timeout: Timeout in seconds. + + Raises: + GitError: If checkout fails. + """ + cmd = ["git", "checkout", ref] + + logger.debug(f"Running: {' '.join(cmd)} in {repo_path}") + + try: + subprocess.run( + cmd, + cwd=repo_path, + check=True, + capture_output=True, + timeout=timeout, + ) + except subprocess.CalledProcessError as e: + stderr = e.stderr.decode() if e.stderr else str(e) + raise GitError(f"Checkout failed: {stderr}") from e + except subprocess.TimeoutExpired as e: + raise GitError(f"Checkout timed out after {timeout}s") from e + + def reset_hard(self, repo_path: Path, ref: str, timeout: int = 30) -> None: + """Hard reset to a ref. + + Args: + repo_path: Path to the repository. + ref: Ref to reset to. + timeout: Timeout in seconds. + + Raises: + GitError: If reset fails. + """ + cmd = ["git", "reset", "--hard", ref] + + logger.debug(f"Running: {' '.join(cmd)} in {repo_path}") + + try: + subprocess.run( + cmd, + cwd=repo_path, + check=True, + capture_output=True, + timeout=timeout, + ) + except subprocess.CalledProcessError as e: + stderr = e.stderr.decode() if e.stderr else str(e) + raise GitError(f"Reset failed: {stderr}") from e + except subprocess.TimeoutExpired as e: + raise GitError(f"Reset timed out after {timeout}s") from e + + def get_current_branch(self, repo_path: Path, timeout: int = 10) -> str | None: + """Get the current branch name. + + Args: + repo_path: Path to the repository. + timeout: Timeout in seconds. + + Returns: + Branch name, or None if in detached HEAD state. + + Raises: + GitError: If command fails. + """ + cmd = ["git", "rev-parse", "--abbrev-ref", "HEAD"] + + logger.debug(f"Running: {' '.join(cmd)} in {repo_path}") + + try: + result = subprocess.run( + cmd, + cwd=repo_path, + capture_output=True, + text=True, + timeout=timeout, + ) + branch = result.stdout.strip() + # "HEAD" means detached HEAD state + return None if branch == "HEAD" else branch + except subprocess.CalledProcessError as e: + stderr = e.stderr if e.stderr else str(e) + raise GitError(f"Failed to get current branch: {stderr}") from e + except subprocess.TimeoutExpired as e: + raise GitError(f"Get branch timed out after {timeout}s") from e diff --git a/openhands-sdk/openhands/sdk/plugin/plugin.py b/openhands-sdk/openhands/sdk/plugin/plugin.py index 21f4d3d56c..df5fee2791 100644 --- a/openhands-sdk/openhands/sdk/plugin/plugin.py +++ b/openhands-sdk/openhands/sdk/plugin/plugin.py @@ -16,6 +16,7 @@ ) from openhands.sdk.hooks import HookConfig from openhands.sdk.logger import get_logger +from openhands.sdk.plugin.fetch import fetch_plugin from openhands.sdk.plugin.types import ( AgentDefinition, CommandDefinition, @@ -83,6 +84,59 @@ def description(self) -> str: """Get the plugin description.""" return self.manifest.description + @classmethod + def fetch( + cls, + source: str, + cache_dir: Path | None = None, + ref: str | None = None, + update: bool = True, + subpath: str | None = None, + ) -> Path: + """Fetch a plugin from a remote source and return the local cached path. + + This method fetches plugins from remote sources (GitHub repositories, git URLs) + and caches them locally. Use the returned path with Plugin.load() to load + the plugin. + + Args: + source: Plugin source - can be: + - "github:owner/repo" - GitHub repository shorthand + - "https://github.com/owner/repo.git" - Full git URL + - "/local/path" - Local path (returned as-is) + cache_dir: Directory for caching. Defaults to ~/.openhands/cache/plugins/ + ref: Optional branch, tag, or commit to checkout. + update: If True and cache exists, update it. If False, use cached as-is. + subpath: Optional subdirectory path within the repo. If specified, the + returned path will point to this subdirectory instead of the + repository root. + + Returns: + Path to the local plugin directory (ready for Plugin.load()). + If subpath is specified, returns the path to that subdirectory. + + Raises: + PluginFetchError: If fetching fails or subpath doesn't exist. + + Example: + >>> path = Plugin.fetch("github:owner/my-plugin") + >>> plugin = Plugin.load(path) + + >>> # With specific version + >>> path = Plugin.fetch("github:owner/my-plugin", ref="v1.0.0") + >>> plugin = Plugin.load(path) + + >>> # Fetch a plugin from a subdirectory + >>> path = Plugin.fetch("github:owner/monorepo", subpath="plugins/sub") + >>> plugin = Plugin.load(path) + + >>> # Fetch and load in one step + >>> plugin = Plugin.load(Plugin.fetch("github:owner/my-plugin")) + """ + return fetch_plugin( + source, cache_dir=cache_dir, ref=ref, update=update, subpath=subpath + ) + @classmethod def load(cls, plugin_path: str | Path) -> Plugin: """Load a plugin from a directory. diff --git a/pyproject.toml b/pyproject.toml index 2f1c150489..ec9ee98ca7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -79,6 +79,24 @@ python_classes = ["Test*"] python_functions = ["test_*"] addopts = "-v --tb=short" asyncio_mode = "auto" +markers = [ + "forked: marks tests as requiring process isolation (deselect with '-m \"not forked\"')", +] + +# Coverage configuration - needed for --forked tests to collect coverage data +[tool.coverage.run] +branch = true +parallel = true +concurrency = ["multiprocessing", "thread"] +sigterm = true +source = ["openhands-sdk", "openhands-tools", "openhands-workspace", "openhands-agent-server"] + +[tool.coverage.report] +exclude_lines = [ + "pragma: no cover", + "if TYPE_CHECKING:", + "raise NotImplementedError", +] # Pyright configuration for PEP 420 namespace packages # This is needed for VSCode to properly resolve imports across multiple packages in the monorepo diff --git a/tests/sdk/plugin/test_plugin_fetch.py b/tests/sdk/plugin/test_plugin_fetch.py new file mode 100644 index 0000000000..5e6b764763 --- /dev/null +++ b/tests/sdk/plugin/test_plugin_fetch.py @@ -0,0 +1,776 @@ +"""Tests for Plugin.fetch() functionality.""" + +import subprocess +from pathlib import Path +from unittest.mock import create_autospec, patch + +import pytest + +from openhands.sdk.plugin import ( + GitError, + GitHelper, + Plugin, + PluginFetchError, + parse_plugin_source, +) +from openhands.sdk.plugin.fetch import ( + _checkout_ref, + _clone_repository, + _extract_readable_name, + _update_repository, + fetch_plugin, + get_cache_path, + get_git_helper, + set_git_helper, +) + + +class TestParsePluginSource: + """Tests for parse_plugin_source function.""" + + def test_github_shorthand(self): + """Test parsing GitHub shorthand format.""" + source_type, url = parse_plugin_source("github:owner/repo") + assert source_type == "github" + assert url == "https://github.com/owner/repo.git" + + def test_github_shorthand_with_whitespace(self): + """Test parsing GitHub shorthand with leading/trailing whitespace.""" + source_type, url = parse_plugin_source(" github:owner/repo ") + assert source_type == "github" + assert url == "https://github.com/owner/repo.git" + + def test_github_shorthand_invalid_format(self): + """Test that invalid GitHub shorthand raises error.""" + with pytest.raises(PluginFetchError, match="Invalid GitHub shorthand"): + parse_plugin_source("github:invalid") + + with pytest.raises(PluginFetchError, match="Invalid GitHub shorthand"): + parse_plugin_source("github:too/many/parts") + + def test_https_git_url(self): + """Test parsing HTTPS git URLs.""" + source_type, url = parse_plugin_source("https://github.com/owner/repo.git") + assert source_type == "git" + assert url == "https://github.com/owner/repo.git" + + def test_https_github_url_without_git_suffix(self): + """Test parsing GitHub HTTPS URL without .git suffix.""" + source_type, url = parse_plugin_source("https://github.com/owner/repo") + assert source_type == "git" + assert url == "https://github.com/owner/repo.git" + + def test_https_github_url_with_trailing_slash(self): + """Test parsing GitHub HTTPS URL with trailing slash.""" + source_type, url = parse_plugin_source("https://github.com/owner/repo/") + assert source_type == "git" + assert url == "https://github.com/owner/repo.git" + + def test_https_gitlab_url(self): + """Test parsing GitLab HTTPS URLs.""" + source_type, url = parse_plugin_source("https://gitlab.com/org/repo") + assert source_type == "git" + assert url == "https://gitlab.com/org/repo.git" + + def test_https_bitbucket_url(self): + """Test parsing Bitbucket HTTPS URLs.""" + source_type, url = parse_plugin_source("https://bitbucket.org/org/repo") + assert source_type == "git" + assert url == "https://bitbucket.org/org/repo.git" + + def test_ssh_git_url(self): + """Test parsing SSH git URLs.""" + source_type, url = parse_plugin_source("git@github.com:owner/repo.git") + assert source_type == "git" + assert url == "git@github.com:owner/repo.git" + + def test_git_protocol_url(self): + """Test parsing git:// protocol URLs.""" + source_type, url = parse_plugin_source("git://github.com/owner/repo.git") + assert source_type == "git" + assert url == "git://github.com/owner/repo.git" + + def test_absolute_local_path(self): + """Test parsing absolute local paths.""" + source_type, url = parse_plugin_source("/path/to/plugin") + assert source_type == "local" + assert url == "/path/to/plugin" + + def test_home_relative_path(self): + """Test parsing home-relative paths.""" + source_type, url = parse_plugin_source("~/plugins/my-plugin") + assert source_type == "local" + assert url == "~/plugins/my-plugin" + + def test_relative_path(self): + """Test parsing relative paths.""" + source_type, url = parse_plugin_source("./plugins/my-plugin") + assert source_type == "local" + assert url == "./plugins/my-plugin" + + def test_invalid_source(self): + """Test that unparseable sources raise error.""" + with pytest.raises(PluginFetchError, match="Unable to parse plugin source"): + parse_plugin_source("invalid-source-format") + + +class TestExtractReadableName: + """Tests for _extract_readable_name function.""" + + def test_github_shorthand(self): + """Test extracting name from GitHub shorthand.""" + name = _extract_readable_name("github:owner/my-plugin") + assert name == "my-plugin" + + def test_https_url(self): + """Test extracting name from HTTPS URL.""" + name = _extract_readable_name("https://github.com/owner/my-plugin.git") + assert name == "my-plugin" + + def test_ssh_url(self): + """Test extracting name from SSH URL.""" + name = _extract_readable_name("git@github.com:owner/my-plugin.git") + assert name == "my-plugin" + + def test_local_path(self): + """Test extracting name from local path.""" + name = _extract_readable_name("/path/to/my-plugin") + assert name == "my-plugin" + + def test_special_characters_sanitized(self): + """Test that special characters are sanitized.""" + name = _extract_readable_name("https://github.com/owner/my.special@plugin!.git") + assert name == "my-special-plugin" + + def test_long_name_truncated(self): + """Test that long names are truncated.""" + name = _extract_readable_name( + "github:owner/this-is-a-very-long-plugin-name-that-should-be-truncated" + ) + assert len(name) <= 32 + + +class TestGetCachePath: + """Tests for get_cache_path function.""" + + def test_deterministic_path(self, tmp_path: Path): + """Test that cache path is deterministic for same source.""" + source = "https://github.com/owner/repo.git" + path1 = get_cache_path(source, tmp_path) + path2 = get_cache_path(source, tmp_path) + assert path1 == path2 + + def test_different_sources_different_paths(self, tmp_path: Path): + """Test that different sources get different paths.""" + path1 = get_cache_path("https://github.com/owner/repo1.git", tmp_path) + path2 = get_cache_path("https://github.com/owner/repo2.git", tmp_path) + assert path1 != path2 + + def test_path_includes_readable_name(self, tmp_path: Path): + """Test that cache path includes readable name.""" + source = "https://github.com/owner/my-plugin.git" + path = get_cache_path(source, tmp_path) + assert "my-plugin" in path.name + + def test_default_cache_dir(self): + """Test that default cache dir is under ~/.openhands/cache/plugins/.""" + source = "https://github.com/owner/repo.git" + path = get_cache_path(source) + assert ".openhands" in str(path) + assert "cache" in str(path) + assert "plugins" in str(path) + + +class TestCloneRepository: + """Tests for _clone_repository function.""" + + def test_clone_calls_git_helper(self, tmp_path: Path): + """Test that clone delegates to GitHelper.""" + mock_git = create_autospec(GitHelper) + dest = tmp_path / "repo" + + _clone_repository("https://github.com/owner/repo.git", dest, None, mock_git) + + mock_git.clone.assert_called_once_with( + "https://github.com/owner/repo.git", dest, depth=1, branch=None + ) + + def test_clone_with_ref(self, tmp_path: Path): + """Test clone with branch/tag ref.""" + mock_git = create_autospec(GitHelper) + dest = tmp_path / "repo" + + _clone_repository("https://github.com/owner/repo.git", dest, "v1.0.0", mock_git) + + mock_git.clone.assert_called_once_with( + "https://github.com/owner/repo.git", dest, depth=1, branch="v1.0.0" + ) + + def test_clone_removes_existing_directory(self, tmp_path: Path): + """Test that existing non-git directory is removed.""" + mock_git = create_autospec(GitHelper) + dest = tmp_path / "repo" + dest.mkdir() + (dest / "some_file.txt").write_text("test") + + _clone_repository("https://github.com/owner/repo.git", dest, None, mock_git) + + mock_git.clone.assert_called_once() + + +class TestUpdateRepository: + """Tests for _update_repository function.""" + + def test_update_fetches_and_resets(self, tmp_path: Path): + """Test update fetches from origin and resets to branch.""" + mock_git = create_autospec(GitHelper) + mock_git.get_current_branch.return_value = "main" + + _update_repository(tmp_path, None, mock_git) + + mock_git.fetch.assert_called_once_with(tmp_path) + mock_git.get_current_branch.assert_called_once_with(tmp_path) + mock_git.reset_hard.assert_called_once_with(tmp_path, "origin/main") + + def test_update_with_ref_checks_out(self, tmp_path: Path): + """Test update with ref checks out that ref.""" + mock_git = create_autospec(GitHelper) + + _update_repository(tmp_path, "v1.0.0", mock_git) + + # fetch is called twice: once in _update_repository, once in _checkout_ref + assert mock_git.fetch.call_count == 2 + mock_git.checkout.assert_called_once_with(tmp_path, "v1.0.0") + + def test_update_handles_detached_head(self, tmp_path: Path): + """Test update handles detached HEAD state (no reset).""" + mock_git = create_autospec(GitHelper) + mock_git.get_current_branch.return_value = None + + _update_repository(tmp_path, None, mock_git) + + mock_git.fetch.assert_called_once() + mock_git.reset_hard.assert_not_called() + + def test_update_handles_git_error(self, tmp_path: Path): + """Test update continues on GitError (uses cached version).""" + mock_git = create_autospec(GitHelper) + mock_git.fetch.side_effect = GitError("Network error") + + _update_repository(tmp_path, None, mock_git) + + +class TestCheckoutRef: + """Tests for _checkout_ref function.""" + + def test_checkout_fetches_and_checks_out(self, tmp_path: Path): + """Test checkout fetches ref and checks out.""" + mock_git = create_autospec(GitHelper) + + _checkout_ref(tmp_path, "v1.0.0", mock_git) + + mock_git.fetch.assert_called_once_with(tmp_path, ref="v1.0.0") + mock_git.checkout.assert_called_once_with(tmp_path, "v1.0.0") + mock_git.reset_hard.assert_called_once_with(tmp_path, "origin/v1.0.0") + + def test_checkout_handles_fetch_error(self, tmp_path: Path): + """Test checkout continues if fetch fails (e.g., for commits).""" + mock_git = create_autospec(GitHelper) + mock_git.fetch.side_effect = GitError("Not a branch") + + _checkout_ref(tmp_path, "abc123", mock_git) + + mock_git.checkout.assert_called_once_with(tmp_path, "abc123") + + def test_checkout_handles_reset_error(self, tmp_path: Path): + """Test checkout continues if reset fails (e.g., for tags).""" + mock_git = create_autospec(GitHelper) + mock_git.reset_hard.side_effect = GitError("Not a branch") + + _checkout_ref(tmp_path, "v1.0.0", mock_git) + + mock_git.checkout.assert_called_once() + + +class TestFetchPlugin: + """Tests for fetch_plugin function.""" + + def test_fetch_local_path(self, tmp_path: Path): + """Test fetching from local path returns the path unchanged.""" + plugin_dir = tmp_path / "my-plugin" + plugin_dir.mkdir() + + result = fetch_plugin(str(plugin_dir)) + assert result == plugin_dir.resolve() + + def test_fetch_local_path_nonexistent(self, tmp_path: Path): + """Test fetching nonexistent local path raises error.""" + with pytest.raises(PluginFetchError, match="does not exist"): + fetch_plugin(str(tmp_path / "nonexistent")) + + def test_fetch_github_shorthand_clones(self, tmp_path: Path): + """Test fetching GitHub shorthand clones the repository.""" + mock_git = create_autospec(GitHelper) + + def clone_side_effect(url, dest, **kwargs): + dest.mkdir(parents=True, exist_ok=True) + (dest / ".git").mkdir() + + mock_git.clone.side_effect = clone_side_effect + + result = fetch_plugin( + "github:owner/repo", cache_dir=tmp_path, git_helper=mock_git + ) + + assert result.exists() + mock_git.clone.assert_called_once() + call_args = mock_git.clone.call_args + assert call_args[0][0] == "https://github.com/owner/repo.git" + + def test_fetch_with_ref(self, tmp_path: Path): + """Test fetching with specific ref.""" + mock_git = create_autospec(GitHelper) + + def clone_side_effect(url, dest, **kwargs): + dest.mkdir(parents=True, exist_ok=True) + (dest / ".git").mkdir() + + mock_git.clone.side_effect = clone_side_effect + + fetch_plugin( + "github:owner/repo", cache_dir=tmp_path, ref="v1.0.0", git_helper=mock_git + ) + + mock_git.clone.assert_called_once() + call_kwargs = mock_git.clone.call_args[1] + assert call_kwargs["branch"] == "v1.0.0" + + def test_fetch_updates_existing_cache(self, tmp_path: Path): + """Test that fetch updates existing cached repository.""" + mock_git = create_autospec(GitHelper) + mock_git.get_current_branch.return_value = "main" + + cache_path = get_cache_path("https://github.com/owner/repo.git", tmp_path) + cache_path.mkdir(parents=True) + (cache_path / ".git").mkdir() + + result = fetch_plugin( + "github:owner/repo", cache_dir=tmp_path, update=True, git_helper=mock_git + ) + + assert result == cache_path + mock_git.fetch.assert_called() + mock_git.clone.assert_not_called() + + def test_fetch_no_update_uses_cache(self, tmp_path: Path): + """Test that fetch with update=False uses cached version.""" + mock_git = create_autospec(GitHelper) + + cache_path = get_cache_path("https://github.com/owner/repo.git", tmp_path) + cache_path.mkdir(parents=True) + (cache_path / ".git").mkdir() + + result = fetch_plugin( + "github:owner/repo", cache_dir=tmp_path, update=False, git_helper=mock_git + ) + + assert result == cache_path + mock_git.clone.assert_not_called() + mock_git.fetch.assert_not_called() + + def test_fetch_no_update_with_ref_checks_out(self, tmp_path: Path): + """Test that fetch with update=False but ref still checks out.""" + mock_git = create_autospec(GitHelper) + + cache_path = get_cache_path("https://github.com/owner/repo.git", tmp_path) + cache_path.mkdir(parents=True) + (cache_path / ".git").mkdir() + + fetch_plugin( + "github:owner/repo", + cache_dir=tmp_path, + update=False, + ref="v1.0.0", + git_helper=mock_git, + ) + + mock_git.checkout.assert_called_once_with(cache_path, "v1.0.0") + + def test_fetch_git_error_raises_plugin_fetch_error(self, tmp_path: Path): + """Test that git errors are wrapped in PluginFetchError.""" + mock_git = create_autospec(GitHelper) + mock_git.clone.side_effect = GitError("fatal: repository not found") + + with pytest.raises(PluginFetchError, match="Git operation failed"): + fetch_plugin( + "github:owner/nonexistent", cache_dir=tmp_path, git_helper=mock_git + ) + + def test_fetch_generic_error_wrapped(self, tmp_path: Path): + """Test that generic errors are wrapped in PluginFetchError.""" + mock_git = create_autospec(GitHelper) + mock_git.clone.side_effect = RuntimeError("Unexpected error") + + with pytest.raises(PluginFetchError, match="Failed to fetch plugin"): + fetch_plugin("github:owner/repo", cache_dir=tmp_path, git_helper=mock_git) + + +class TestPluginFetchMethod: + """Tests for Plugin.fetch() classmethod.""" + + def test_fetch_delegates_to_fetch_plugin(self, tmp_path: Path): + """Test that Plugin.fetch() delegates to fetch_plugin.""" + plugin_dir = tmp_path / "my-plugin" + plugin_dir.mkdir() + + result = Plugin.fetch(str(plugin_dir)) + assert result == plugin_dir.resolve() + + def test_fetch_local_path_with_tilde(self, tmp_path: Path): + """Test fetching local path with ~ expansion.""" + plugin_dir = tmp_path / "my-plugin" + plugin_dir.mkdir() + + with patch("openhands.sdk.plugin.fetch.Path.home", return_value=tmp_path): + result = Plugin.fetch(str(plugin_dir)) + assert result.exists() + + +class TestSubpathParameter: + """Tests for subpath parameter in fetch_plugin() and Plugin.fetch().""" + + def test_fetch_local_path_with_subpath(self, tmp_path: Path): + """Test fetching local path with subpath returns the subdirectory.""" + plugin_dir = tmp_path / "monorepo" + plugin_dir.mkdir() + subplugin_dir = plugin_dir / "plugins" / "my-plugin" + subplugin_dir.mkdir(parents=True) + + result = fetch_plugin(str(plugin_dir), subpath="plugins/my-plugin") + assert result == subplugin_dir.resolve() + + def test_fetch_local_path_with_nonexistent_subpath(self, tmp_path: Path): + """Test fetching local path with nonexistent subpath raises error.""" + plugin_dir = tmp_path / "monorepo" + plugin_dir.mkdir() + + with pytest.raises(PluginFetchError, match="Subdirectory.*not found"): + fetch_plugin(str(plugin_dir), subpath="nonexistent/path") + + def test_fetch_local_path_with_subpath_leading_slash(self, tmp_path: Path): + """Test that leading slashes are stripped from subpath.""" + plugin_dir = tmp_path / "monorepo" + plugin_dir.mkdir() + subplugin_dir = plugin_dir / "plugins" / "my-plugin" + subplugin_dir.mkdir(parents=True) + + result = fetch_plugin(str(plugin_dir), subpath="/plugins/my-plugin/") + assert result == subplugin_dir.resolve() + + def test_fetch_github_with_subpath(self, tmp_path: Path): + """Test fetching from GitHub with subpath returns subdirectory.""" + mock_git = create_autospec(GitHelper) + + def clone_side_effect(url, dest, **kwargs): + dest.mkdir(parents=True, exist_ok=True) + (dest / ".git").mkdir() + # Create the subdirectory structure + subdir = dest / "plugins" / "sub-plugin" + subdir.mkdir(parents=True) + + mock_git.clone.side_effect = clone_side_effect + + result = fetch_plugin( + "github:owner/monorepo", + cache_dir=tmp_path, + subpath="plugins/sub-plugin", + git_helper=mock_git, + ) + + assert result.exists() + assert result.name == "sub-plugin" + assert "plugins" in str(result) + + def test_fetch_github_with_nonexistent_subpath(self, tmp_path: Path): + """Test fetching from GitHub with nonexistent subpath raises error.""" + mock_git = create_autospec(GitHelper) + + def clone_side_effect(url, dest, **kwargs): + dest.mkdir(parents=True, exist_ok=True) + (dest / ".git").mkdir() + + mock_git.clone.side_effect = clone_side_effect + + with pytest.raises(PluginFetchError, match="Subdirectory.*not found"): + fetch_plugin( + "github:owner/repo", + cache_dir=tmp_path, + subpath="nonexistent", + git_helper=mock_git, + ) + + def test_fetch_with_subpath_and_ref(self, tmp_path: Path): + """Test fetching with both subpath and ref.""" + mock_git = create_autospec(GitHelper) + + def clone_side_effect(url, dest, **kwargs): + dest.mkdir(parents=True, exist_ok=True) + (dest / ".git").mkdir() + subdir = dest / "plugins" / "my-plugin" + subdir.mkdir(parents=True) + + mock_git.clone.side_effect = clone_side_effect + + result = fetch_plugin( + "github:owner/monorepo", + cache_dir=tmp_path, + ref="v1.0.0", + subpath="plugins/my-plugin", + git_helper=mock_git, + ) + + assert result.exists() + mock_git.clone.assert_called_once() + call_kwargs = mock_git.clone.call_args[1] + assert call_kwargs["branch"] == "v1.0.0" + + def test_plugin_fetch_with_subpath(self, tmp_path: Path): + """Test Plugin.fetch() with subpath parameter.""" + plugin_dir = tmp_path / "monorepo" + plugin_dir.mkdir() + subplugin_dir = plugin_dir / "plugins" / "my-plugin" + subplugin_dir.mkdir(parents=True) + + result = Plugin.fetch(str(plugin_dir), subpath="plugins/my-plugin") + assert result == subplugin_dir.resolve() + + def test_fetch_no_subpath_returns_root(self, tmp_path: Path): + """Test that fetch without subpath returns repository root.""" + mock_git = create_autospec(GitHelper) + + def clone_side_effect(url, dest, **kwargs): + dest.mkdir(parents=True, exist_ok=True) + (dest / ".git").mkdir() + (dest / "plugins").mkdir() + + mock_git.clone.side_effect = clone_side_effect + + result = fetch_plugin( + "github:owner/repo", + cache_dir=tmp_path, + subpath=None, + git_helper=mock_git, + ) + + assert result.exists() + assert (result / ".git").exists() + + +class TestSetGitHelper: + """Tests for set_git_helper and get_git_helper functions.""" + + def test_set_and_get_git_helper(self): + """Test setting and getting a custom GitHelper instance.""" + # Save original + original = get_git_helper() + + try: + # Set a custom helper + custom_helper = GitHelper() + set_git_helper(custom_helper) + assert get_git_helper() is custom_helper + + # Reset to None to test lazy initialization + set_git_helper(None) + # Next get should create a new instance + new_helper = get_git_helper() + assert new_helper is not None + assert new_helper is not custom_helper + finally: + # Restore original + set_git_helper(original) + + +class TestParsePluginSourceEdgeCases: + """Additional edge case tests for parse_plugin_source.""" + + def test_relative_path_with_slash(self): + """Test parsing paths like 'plugins/my-plugin' (line 108).""" + source_type, url = parse_plugin_source("plugins/my-plugin") + assert source_type == "local" + assert url == "plugins/my-plugin" + + def test_nested_relative_path(self): + """Test parsing nested relative paths.""" + source_type, url = parse_plugin_source("path/to/my/plugin") + assert source_type == "local" + assert url == "path/to/my/plugin" + + +class TestFetchPluginEdgeCases: + """Additional edge case tests for fetch_plugin.""" + + def test_fetch_uses_default_cache_dir(self, tmp_path: Path): + """Test fetch_plugin uses DEFAULT_CACHE_DIR when cache_dir is None.""" + mock_git = create_autospec(GitHelper) + + def clone_side_effect(url, dest, **kwargs): + dest.mkdir(parents=True, exist_ok=True) + (dest / ".git").mkdir() + + mock_git.clone.side_effect = clone_side_effect + + # Patch DEFAULT_CACHE_DIR to use tmp_path + with patch("openhands.sdk.plugin.fetch.DEFAULT_CACHE_DIR", tmp_path / "cache"): + result = fetch_plugin( + "github:owner/repo", + cache_dir=None, # Explicitly None to trigger line 225 + git_helper=mock_git, + ) + + assert result.exists() + assert str(tmp_path / "cache") in str(result) + + def test_fetch_reraises_plugin_fetch_error(self, tmp_path: Path): + """Test that PluginFetchError is re-raised directly (line 248).""" + mock_git = create_autospec(GitHelper) + + # Make clone raise PluginFetchError to test the re-raise path + mock_git.clone.side_effect = PluginFetchError("Test error from clone") + + # This should re-raise the PluginFetchError directly + with pytest.raises(PluginFetchError, match="Test error from clone"): + fetch_plugin( + "github:owner/repo", + cache_dir=tmp_path, + git_helper=mock_git, + ) + + +class TestGitHelperErrors: + """Tests for GitHelper error handling paths.""" + + def test_clone_called_process_error(self, tmp_path: Path): + """Test clone handles CalledProcessError (lines 62-64).""" + git = GitHelper() + dest = tmp_path / "repo" + + # Try to clone a non-existent repo + with pytest.raises(GitError, match="Clone failed"): + git.clone("https://invalid.example.com/nonexistent.git", dest, timeout=5) + + def test_clone_timeout(self, tmp_path: Path): + """Test clone handles TimeoutExpired (lines 65-66).""" + git = GitHelper() + dest = tmp_path / "repo" + + with patch("subprocess.run") as mock_run: + mock_run.side_effect = subprocess.TimeoutExpired(cmd=["git"], timeout=1) + with pytest.raises(GitError, match="timed out"): + git.clone("https://github.com/owner/repo.git", dest, timeout=1) + + def test_fetch_with_ref(self, tmp_path: Path): + """Test fetch appends ref to command (line 88).""" + # Create a repo to fetch in + repo = tmp_path / "repo" + repo.mkdir() + subprocess.run(["git", "init"], cwd=repo, check=True) + subprocess.run( + ["git", "config", "user.email", "test@test.com"], cwd=repo, check=True + ) + subprocess.run(["git", "config", "user.name", "Test"], cwd=repo, check=True) + (repo / "file.txt").write_text("content") + subprocess.run(["git", "add", "."], cwd=repo, check=True) + subprocess.run(["git", "commit", "-m", "Initial"], cwd=repo, check=True) + + git = GitHelper() + # This will fail because there's no remote, but it will hit line 88 + with pytest.raises(GitError, match="Fetch failed"): + git.fetch(repo, ref="main") + + def test_fetch_called_process_error(self, tmp_path: Path): + """Test fetch handles CalledProcessError (lines 100-102).""" + git = GitHelper() + repo = tmp_path / "not-a-repo" + repo.mkdir() + + with pytest.raises(GitError, match="Fetch failed"): + git.fetch(repo) + + def test_fetch_timeout(self, tmp_path: Path): + """Test fetch handles TimeoutExpired (lines 103-104).""" + git = GitHelper() + repo = tmp_path / "repo" + repo.mkdir() + + with patch("subprocess.run") as mock_run: + mock_run.side_effect = subprocess.TimeoutExpired(cmd=["git"], timeout=1) + with pytest.raises(GitError, match="timed out"): + git.fetch(repo, timeout=1) + + def test_checkout_called_process_error(self, tmp_path: Path): + """Test checkout handles CalledProcessError (lines 129-131).""" + git = GitHelper() + repo = tmp_path / "repo" + repo.mkdir() + subprocess.run(["git", "init"], cwd=repo, check=True) + + with pytest.raises(GitError, match="Checkout failed"): + git.checkout(repo, "nonexistent-ref") + + def test_checkout_timeout(self, tmp_path: Path): + """Test checkout handles TimeoutExpired (lines 132-133).""" + git = GitHelper() + repo = tmp_path / "repo" + repo.mkdir() + + with patch("subprocess.run") as mock_run: + mock_run.side_effect = subprocess.TimeoutExpired(cmd=["git"], timeout=1) + with pytest.raises(GitError, match="timed out"): + git.checkout(repo, "main", timeout=1) + + def test_reset_hard_called_process_error(self, tmp_path: Path): + """Test reset_hard handles CalledProcessError (lines 158-160).""" + git = GitHelper() + repo = tmp_path / "repo" + repo.mkdir() + subprocess.run(["git", "init"], cwd=repo, check=True) + + with pytest.raises(GitError, match="Reset failed"): + git.reset_hard(repo, "nonexistent-ref") + + def test_reset_hard_timeout(self, tmp_path: Path): + """Test reset_hard handles TimeoutExpired (lines 161-162).""" + git = GitHelper() + repo = tmp_path / "repo" + repo.mkdir() + + with patch("subprocess.run") as mock_run: + mock_run.side_effect = subprocess.TimeoutExpired(cmd=["git"], timeout=1) + with pytest.raises(GitError, match="timed out"): + git.reset_hard(repo, "HEAD", timeout=1) + + def test_get_current_branch_called_process_error(self, tmp_path: Path): + """Test get_current_branch handles CalledProcessError (lines 192-194).""" + git = GitHelper() + repo = tmp_path / "repo" + repo.mkdir() + + # Mock subprocess.run to raise CalledProcessError + with patch("subprocess.run") as mock_run: + mock_run.side_effect = subprocess.CalledProcessError( + returncode=1, cmd=["git"], stderr="fatal: not a git repository" + ) + with pytest.raises(GitError, match="Failed to get current branch"): + git.get_current_branch(repo) + + def test_get_current_branch_timeout(self, tmp_path: Path): + """Test get_current_branch handles TimeoutExpired (lines 195-196).""" + git = GitHelper() + repo = tmp_path / "repo" + repo.mkdir() + + with patch("subprocess.run") as mock_run: + mock_run.side_effect = subprocess.TimeoutExpired(cmd=["git"], timeout=1) + with pytest.raises(GitError, match="timed out"): + git.get_current_branch(repo, timeout=1) diff --git a/tests/sdk/plugin/test_plugin_fetch_integration.py b/tests/sdk/plugin/test_plugin_fetch_integration.py new file mode 100644 index 0000000000..99ebefe2bb --- /dev/null +++ b/tests/sdk/plugin/test_plugin_fetch_integration.py @@ -0,0 +1,296 @@ +"""Integration tests for Plugin.fetch() with real git operations. + +These tests perform actual git operations and may require network access. +They are designed to test the full end-to-end flow of plugin fetching. +""" + +import subprocess +from pathlib import Path + +from openhands.sdk.plugin import GitHelper, Plugin +from openhands.sdk.plugin.fetch import fetch_plugin + + +class TestGitHelperIntegration: + """Integration tests for GitHelper with real git operations.""" + + def test_clone_real_repo(self, tmp_path: Path): + """Test cloning a real repository.""" + git = GitHelper() + dest = tmp_path / "repo" + + # Create a local bare repo to clone from + bare_repo = tmp_path / "bare.git" + subprocess.run(["git", "init", "--bare", str(bare_repo)], check=True) + + git.clone(f"file://{bare_repo}", dest) + + assert dest.exists() + assert (dest / ".git").exists() + + def test_clone_with_branch(self, tmp_path: Path): + """Test cloning with a specific branch.""" + git = GitHelper() + + # Create a source repo with a branch + source = tmp_path / "source" + source.mkdir() + subprocess.run(["git", "init"], cwd=source, check=True) + subprocess.run( + ["git", "config", "user.email", "test@test.com"], cwd=source, check=True + ) + subprocess.run(["git", "config", "user.name", "Test"], cwd=source, check=True) + (source / "file.txt").write_text("content") + subprocess.run(["git", "add", "."], cwd=source, check=True) + subprocess.run(["git", "commit", "-m", "Initial"], cwd=source, check=True) + subprocess.run(["git", "branch", "feature"], cwd=source, check=True) + + dest = tmp_path / "dest" + git.clone(f"file://{source}", dest, branch="feature") + + assert dest.exists() + # Verify we're on the feature branch + result = subprocess.run( + ["git", "branch", "--show-current"], + cwd=dest, + capture_output=True, + text=True, + ) + assert result.stdout.strip() == "feature" + + def test_fetch_and_checkout(self, tmp_path: Path): + """Test fetch and checkout operations.""" + git = GitHelper() + + # Create source repo + source = tmp_path / "source" + source.mkdir() + subprocess.run(["git", "init"], cwd=source, check=True) + subprocess.run( + ["git", "config", "user.email", "test@test.com"], cwd=source, check=True + ) + subprocess.run(["git", "config", "user.name", "Test"], cwd=source, check=True) + (source / "file.txt").write_text("v1") + subprocess.run(["git", "add", "."], cwd=source, check=True) + subprocess.run(["git", "commit", "-m", "v1"], cwd=source, check=True) + subprocess.run(["git", "tag", "v1.0.0"], cwd=source, check=True) + + # Clone it + dest = tmp_path / "dest" + git.clone(f"file://{source}", dest, depth=None) + + # Make changes in source + (source / "file.txt").write_text("v2") + subprocess.run(["git", "add", "."], cwd=source, check=True) + subprocess.run(["git", "commit", "-m", "v2"], cwd=source, check=True) + + # Fetch and verify + git.fetch(dest) + + # Checkout tag + git.checkout(dest, "v1.0.0") + assert (dest / "file.txt").read_text() == "v1" + + def test_get_current_branch(self, tmp_path: Path): + """Test getting current branch name.""" + git = GitHelper() + + # Create repo + repo = tmp_path / "repo" + repo.mkdir() + subprocess.run(["git", "init"], cwd=repo, check=True) + subprocess.run( + ["git", "config", "user.email", "test@test.com"], cwd=repo, check=True + ) + subprocess.run(["git", "config", "user.name", "Test"], cwd=repo, check=True) + (repo / "file.txt").write_text("content") + subprocess.run(["git", "add", "."], cwd=repo, check=True) + subprocess.run(["git", "commit", "-m", "Initial"], cwd=repo, check=True) + + # Default branch + branch = git.get_current_branch(repo) + assert branch in ("main", "master") + + # Create and switch to new branch + subprocess.run(["git", "checkout", "-b", "develop"], cwd=repo, check=True) + branch = git.get_current_branch(repo) + assert branch == "develop" + + def test_get_current_branch_detached_head(self, tmp_path: Path): + """Test that detached HEAD returns None.""" + git = GitHelper() + + # Create repo with commits + repo = tmp_path / "repo" + repo.mkdir() + subprocess.run(["git", "init"], cwd=repo, check=True) + subprocess.run( + ["git", "config", "user.email", "test@test.com"], cwd=repo, check=True + ) + subprocess.run(["git", "config", "user.name", "Test"], cwd=repo, check=True) + (repo / "file.txt").write_text("v1") + subprocess.run(["git", "add", "."], cwd=repo, check=True) + subprocess.run(["git", "commit", "-m", "v1"], cwd=repo, check=True) + (repo / "file.txt").write_text("v2") + subprocess.run(["git", "add", "."], cwd=repo, check=True) + subprocess.run(["git", "commit", "-m", "v2"], cwd=repo, check=True) + + # Get commit hash of first commit + result = subprocess.run( + ["git", "rev-list", "--max-parents=0", "HEAD"], + cwd=repo, + capture_output=True, + text=True, + ) + first_commit = result.stdout.strip() + + # Detach HEAD + subprocess.run(["git", "checkout", first_commit], cwd=repo, check=True) + + branch = git.get_current_branch(repo) + assert branch is None + + +class TestFetchPluginIntegration: + """Integration tests for fetch_plugin with real git operations.""" + + def test_fetch_from_local_git_repo(self, tmp_path: Path): + """Test fetching a plugin from a local git repository.""" + # Create a plugin repo + plugin_repo = tmp_path / "my-plugin" + plugin_repo.mkdir() + subprocess.run(["git", "init"], cwd=plugin_repo, check=True) + subprocess.run( + ["git", "config", "user.email", "test@test.com"], + cwd=plugin_repo, + check=True, + ) + subprocess.run( + ["git", "config", "user.name", "Test"], cwd=plugin_repo, check=True + ) + + # Add plugin files + (plugin_repo / ".plugin").mkdir() + (plugin_repo / ".plugin" / "plugin.json").write_text( + '{"name": "test-plugin", "version": "1.0.0", "description": "Test"}' + ) + subprocess.run(["git", "add", "."], cwd=plugin_repo, check=True) + subprocess.run(["git", "commit", "-m", "Initial"], cwd=plugin_repo, check=True) + + # Fetch it + cache_dir = tmp_path / "cache" + result = fetch_plugin(f"file://{plugin_repo}", cache_dir=cache_dir) + + assert result.exists() + assert (result / ".plugin" / "plugin.json").exists() + + def test_fetch_caches_and_updates(self, tmp_path: Path): + """Test that fetch caches and updates work correctly.""" + # Create plugin repo + plugin_repo = tmp_path / "plugin" + plugin_repo.mkdir() + subprocess.run(["git", "init"], cwd=plugin_repo, check=True) + subprocess.run( + ["git", "config", "user.email", "test@test.com"], + cwd=plugin_repo, + check=True, + ) + subprocess.run( + ["git", "config", "user.name", "Test"], cwd=plugin_repo, check=True + ) + (plugin_repo / "version.txt").write_text("v1") + subprocess.run(["git", "add", "."], cwd=plugin_repo, check=True) + subprocess.run(["git", "commit", "-m", "v1"], cwd=plugin_repo, check=True) + + cache_dir = tmp_path / "cache" + + # First fetch + result1 = fetch_plugin(f"file://{plugin_repo}", cache_dir=cache_dir) + assert (result1 / "version.txt").read_text() == "v1" + + # Update source + (plugin_repo / "version.txt").write_text("v2") + subprocess.run(["git", "add", "."], cwd=plugin_repo, check=True) + subprocess.run(["git", "commit", "-m", "v2"], cwd=plugin_repo, check=True) + + # Fetch with update=True + result2 = fetch_plugin( + f"file://{plugin_repo}", cache_dir=cache_dir, update=True + ) + assert result1 == result2 # Same cache path + assert (result2 / "version.txt").read_text() == "v2" + + def test_fetch_with_ref(self, tmp_path: Path): + """Test fetching a specific ref.""" + # Create plugin repo with tags + plugin_repo = tmp_path / "plugin" + plugin_repo.mkdir() + subprocess.run(["git", "init"], cwd=plugin_repo, check=True) + subprocess.run( + ["git", "config", "user.email", "test@test.com"], + cwd=plugin_repo, + check=True, + ) + subprocess.run( + ["git", "config", "user.name", "Test"], cwd=plugin_repo, check=True + ) + + # v1 + (plugin_repo / "version.txt").write_text("v1") + subprocess.run(["git", "add", "."], cwd=plugin_repo, check=True) + subprocess.run(["git", "commit", "-m", "v1"], cwd=plugin_repo, check=True) + subprocess.run(["git", "tag", "v1.0.0"], cwd=plugin_repo, check=True) + + # v2 + (plugin_repo / "version.txt").write_text("v2") + subprocess.run(["git", "add", "."], cwd=plugin_repo, check=True) + subprocess.run(["git", "commit", "-m", "v2"], cwd=plugin_repo, check=True) + + cache_dir = tmp_path / "cache" + + # Fetch v1.0.0 + result = fetch_plugin( + f"file://{plugin_repo}", cache_dir=cache_dir, ref="v1.0.0" + ) + assert (result / "version.txt").read_text() == "v1" + + +class TestPluginFetchMethodIntegration: + """Integration tests for Plugin.fetch() classmethod.""" + + def test_fetch_and_load_plugin(self, tmp_path: Path): + """Test the full fetch and load workflow.""" + # Create a complete plugin + plugin_repo = tmp_path / "complete-plugin" + plugin_repo.mkdir() + subprocess.run(["git", "init"], cwd=plugin_repo, check=True) + subprocess.run( + ["git", "config", "user.email", "test@test.com"], + cwd=plugin_repo, + check=True, + ) + subprocess.run( + ["git", "config", "user.name", "Test"], cwd=plugin_repo, check=True + ) + + # Create plugin structure + (plugin_repo / ".plugin").mkdir() + (plugin_repo / ".plugin" / "plugin.json").write_text( + """{ + "name": "complete-plugin", + "version": "1.0.0", + "description": "A complete test plugin" + }""" + ) + + subprocess.run(["git", "add", "."], cwd=plugin_repo, check=True) + subprocess.run(["git", "commit", "-m", "Initial"], cwd=plugin_repo, check=True) + + # Fetch and load + cache_dir = tmp_path / "cache" + plugin_path = Plugin.fetch(f"file://{plugin_repo}", cache_dir=cache_dir) + plugin = Plugin.load(plugin_path) + + assert plugin.name == "complete-plugin" + assert plugin.version == "1.0.0" + assert plugin.description == "A complete test plugin" diff --git a/tests/sdk/plugin/test_plugin_loading.py b/tests/sdk/plugin/test_plugin_loading.py index 393b0b16af..749901abb4 100644 --- a/tests/sdk/plugin/test_plugin_loading.py +++ b/tests/sdk/plugin/test_plugin_loading.py @@ -5,6 +5,12 @@ import pytest from openhands.sdk.plugin import Plugin, PluginManifest +from openhands.sdk.plugin.types import ( + AgentDefinition, + CommandDefinition, + PluginAuthor, + _extract_examples, +) class TestPluginManifest: @@ -261,3 +267,350 @@ def test_load_plugin_with_invalid_manifest(self, tmp_path: Path): with pytest.raises(ValueError, match="Invalid JSON"): Plugin.load(plugin_dir) + + def test_load_all_nonexistent_directory(self, tmp_path: Path): + """Test load_all with nonexistent directory returns empty list.""" + plugins = Plugin.load_all(tmp_path / "nonexistent") + assert plugins == [] + + def test_load_all_with_failing_plugin(self, tmp_path: Path): + """Test load_all continues when a plugin fails to load (lines 197-198).""" + plugins_dir = tmp_path / "plugins" + plugins_dir.mkdir() + + # Create a valid plugin + valid_dir = plugins_dir / "valid-plugin" + valid_dir.mkdir() + manifest_dir = valid_dir / ".plugin" + manifest_dir.mkdir() + (manifest_dir / "plugin.json").write_text('{"name": "valid-plugin"}') + + # Create an invalid plugin (will fail to load) + invalid_dir = plugins_dir / "invalid-plugin" + invalid_dir.mkdir() + invalid_manifest_dir = invalid_dir / ".plugin" + invalid_manifest_dir.mkdir() + (invalid_manifest_dir / "plugin.json").write_text("not valid json") + + plugins = Plugin.load_all(plugins_dir) + + # Should load the valid plugin and skip the invalid one + assert len(plugins) == 1 + assert plugins[0].name == "valid-plugin" + + def test_load_plugin_with_author_string(self, tmp_path: Path): + """Test loading manifest with author as string (line 225).""" + plugin_dir = tmp_path / "author-plugin" + plugin_dir.mkdir() + manifest_dir = plugin_dir / ".plugin" + manifest_dir.mkdir() + + # Write manifest with author as string + manifest_file = manifest_dir / "plugin.json" + manifest_file.write_text( + """{ + "name": "author-plugin", + "version": "1.0.0", + "author": "Test Author " + }""" + ) + + plugin = Plugin.load(plugin_dir) + + assert plugin.name == "author-plugin" + assert plugin.manifest.author is not None + assert plugin.manifest.author.name == "Test Author" + assert plugin.manifest.author.email == "test@example.com" + + def test_load_plugin_with_manifest_parse_error(self, tmp_path: Path): + """Test loading manifest with parse error (lines 230-231).""" + plugin_dir = tmp_path / "error-plugin" + plugin_dir.mkdir() + manifest_dir = plugin_dir / ".plugin" + manifest_dir.mkdir() + + # Write manifest with missing required field or wrong type + # This will parse as JSON but fail Pydantic validation + manifest_file = manifest_dir / "plugin.json" + manifest_file.write_text('{"name": 123}') # name should be string + + with pytest.raises(ValueError, match="Failed to parse manifest"): + Plugin.load(plugin_dir) + + +class TestPluginAuthor: + """Tests for PluginAuthor parsing.""" + + def test_from_string_with_email(self): + """Test parsing author string with email (lines 22-25).""" + author = PluginAuthor.from_string("John Doe ") + assert author.name == "John Doe" + assert author.email == "john@example.com" + + def test_from_string_without_email(self): + """Test parsing author string without email (line 26).""" + author = PluginAuthor.from_string("John Doe") + assert author.name == "John Doe" + assert author.email is None + + def test_from_string_with_whitespace(self): + """Test parsing author string with extra whitespace.""" + author = PluginAuthor.from_string(" John Doe < john@example.com > ") + assert author.name == "John Doe" + assert author.email == "john@example.com" + + +class TestExtractExamples: + """Tests for _extract_examples function.""" + + def test_extract_single_example(self): + """Test extracting single example (lines 42-44).""" + description = "A tool. Use when X" + examples = _extract_examples(description) + assert examples == ["Use when X"] + + def test_extract_multiple_examples(self): + """Test extracting multiple examples.""" + description = "First text Second" + examples = _extract_examples(description) + assert examples == ["First", "Second"] + + def test_extract_no_examples(self): + """Test when no examples present.""" + description = "A tool without examples" + examples = _extract_examples(description) + assert examples == [] + + def test_extract_multiline_example(self): + """Test extracting multiline example.""" + description = """ + Multi + Line + """ + examples = _extract_examples(description) + assert len(examples) == 1 + assert "Multi" in examples[0] + + +class TestAgentDefinition: + """Tests for AgentDefinition loading.""" + + def test_load_agent_basic(self, tmp_path: Path): + """Test loading a basic agent definition (lines 99-126).""" + agent_md = tmp_path / "test-agent.md" + agent_md.write_text( + """--- +name: test-agent +description: A test agent +model: gpt-4 +tools: + - Read + - Write +--- + +You are a test agent. +""" + ) + + agent = AgentDefinition.load(agent_md) + + assert agent.name == "test-agent" + assert agent.description == "A test agent" + assert agent.model == "gpt-4" + assert agent.tools == ["Read", "Write"] + assert agent.system_prompt == "You are a test agent." + + def test_load_agent_with_examples(self, tmp_path: Path): + """Test loading agent with when_to_use examples.""" + agent_md = tmp_path / "helper.md" + agent_md.write_text( + """--- +name: helper +description: A helper. When user needs help +--- + +Help the user. +""" + ) + + agent = AgentDefinition.load(agent_md) + assert len(agent.when_to_use_examples) == 1 + assert "When user needs help" in agent.when_to_use_examples[0] + + def test_load_agent_with_color(self, tmp_path: Path): + """Test loading agent with color.""" + agent_md = tmp_path / "colored.md" + agent_md.write_text( + """--- +name: colored +color: blue +--- + +Content. +""" + ) + + agent = AgentDefinition.load(agent_md) + assert agent.color == "blue" + + def test_load_agent_with_tools_as_string(self, tmp_path: Path): + """Test loading agent with tools as single string.""" + agent_md = tmp_path / "single-tool.md" + agent_md.write_text( + """--- +name: single-tool +tools: Read +--- + +Content. +""" + ) + + agent = AgentDefinition.load(agent_md) + assert agent.tools == ["Read"] + + def test_load_agent_defaults(self, tmp_path: Path): + """Test agent defaults when fields not provided.""" + agent_md = tmp_path / "minimal.md" + agent_md.write_text( + """--- +--- + +Just content. +""" + ) + + agent = AgentDefinition.load(agent_md) + assert agent.name == "minimal" # From filename + assert agent.model == "inherit" + assert agent.tools == [] + + def test_load_agent_with_metadata(self, tmp_path: Path): + """Test loading agent with extra metadata.""" + agent_md = tmp_path / "meta.md" + agent_md.write_text( + """--- +name: meta-agent +custom_field: custom_value +--- + +Content. +""" + ) + + agent = AgentDefinition.load(agent_md) + assert agent.metadata.get("custom_field") == "custom_value" + + +class TestCommandDefinition: + """Tests for CommandDefinition loading.""" + + def test_load_command_basic(self, tmp_path: Path): + """Test loading a basic command definition (lines 184-218).""" + command_md = tmp_path / "review.md" + command_md.write_text( + """--- +description: Review code +argument-hint: +allowed-tools: + - Read + - Grep +--- + +Review the specified file. +""" + ) + + command = CommandDefinition.load(command_md) + + assert command.name == "review" + assert command.description == "Review code" + assert command.argument_hint == "" + assert command.allowed_tools == ["Read", "Grep"] + assert command.content == "Review the specified file." + + def test_load_command_with_argument_hint_list(self, tmp_path: Path): + """Test loading command with argument-hint as list.""" + command_md = tmp_path / "multi-arg.md" + command_md.write_text( + """--- +description: Multi arg command +argument-hint: + - + - +--- + +Content. +""" + ) + + command = CommandDefinition.load(command_md) + assert command.argument_hint == " " + + def test_load_command_with_camel_case_fields(self, tmp_path: Path): + """Test loading command with camelCase field names.""" + command_md = tmp_path / "camel.md" + command_md.write_text( + """--- +description: Camel case command +argumentHint: +allowedTools: + - Tool1 +--- + +Content. +""" + ) + + command = CommandDefinition.load(command_md) + assert command.argument_hint == "" + assert command.allowed_tools == ["Tool1"] + + def test_load_command_with_allowed_tools_as_string(self, tmp_path: Path): + """Test loading command with allowed-tools as string.""" + command_md = tmp_path / "single-tool.md" + command_md.write_text( + """--- +description: Single tool +allowed-tools: Read +--- + +Content. +""" + ) + + command = CommandDefinition.load(command_md) + assert command.allowed_tools == ["Read"] + + def test_load_command_defaults(self, tmp_path: Path): + """Test command defaults when fields not provided.""" + command_md = tmp_path / "minimal.md" + command_md.write_text( + """--- +--- + +Just instructions. +""" + ) + + command = CommandDefinition.load(command_md) + assert command.name == "minimal" + assert command.description == "" + assert command.argument_hint is None + assert command.allowed_tools == [] + + def test_load_command_with_metadata(self, tmp_path: Path): + """Test loading command with extra metadata.""" + command_md = tmp_path / "meta.md" + command_md.write_text( + """--- +description: Meta command +custom_field: custom_value +--- + +Content. +""" + ) + + command = CommandDefinition.load(command_md) + assert command.metadata.get("custom_field") == "custom_value"