diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..01e5c9c --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,187 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +github_linter is a Python tool for auditing GitHub repositories at scale. It scans repositories for common configuration issues, missing files, and standardization opportunities across multiple repos. + +## Development Commands + +### Testing and Linting + +- Run all precommit checks: `make precommit` +- Run linting: `uv run ruff check github_linter tests` +- Run type checking: `uv run mypy --strict github_linter tests` +- Run tests: `uv run pytest github_linter tests` +- Run single test: `uv run pytest tests/test_.py::` + +### Running the CLI + +- Run the CLI: `uv run python -m github_linter` +- Run with filters: `uv run python -m github_linter --repo --owner ` +- Run specific module: `uv run python -m github_linter --module ` +- Run with fixes: `uv run python -m github_linter --fix` +- List available repos: `uv run python -m github_linter --list-repos` + +### Web Interface + +- Start web server: `uv run python -m github_linter.web` +- Or use the script: `./run_web.sh` + +### Docker + +- Build container: `make docker_build` +- Run web server in container: `make docker_run` + +## Architecture + +### Core Components + +1. **GithubLinter** (`github_linter/__init__.py`) - Main orchestrator that: + - Handles GitHub authentication (via environment variable `GITHUB_TOKEN` or config file) + - Manages rate limiting + - Coordinates module execution across repositories + - Generates reports + +2. **RepoLinter** (`github_linter/repolinter.py`) - Per-repository handler that: + - Manages file caching for the repository + - Runs test modules against the repository + - Tracks errors, warnings, and fixes + - Provides utility methods for checking files and languages + - Handles file creation/updates with protected branch awareness + +3. **Test Modules** (`github_linter/tests/`) - Pluggable modules that check specific aspects: + - Each module must define `CATEGORY`, `LANGUAGES`, and `DEFAULT_CONFIG` + - Functions starting with `check_` are automatically discovered and run + - Functions starting with `fix_` are run when `--fix` flag is used + - Modules are loaded dynamically in `tests/__init__.py` + +### Available Test Modules + +- `branch_protection` - Validates and configures branch protection on default branches +- `codeowners` - Validates CODEOWNERS files +- `dependabot` - Validates Dependabot configuration +- `generic` - Checks for unwanted files, CODEOWNERS, FUNDING.yml +- `github_actions` - Validates GitHub Actions workflows +- `homebrew` - Homebrew-specific checks +- `issues` - Reports on open issues and PRs +- `mkdocs` - Ensures mkdocs projects have proper CI setup +- `pyproject` - Validates pyproject.toml (authors, naming, configuration) +- `security_md` - Checks for SECURITY.md +- `terraform` - Checks Terraform provider configurations + +### Module Language Filtering + +Modules declare which languages they apply to via the `LANGUAGES` attribute: +- Use `["all"]` for modules that apply to all repositories +- Use specific languages (e.g., `["python"]`, `["rust"]`) to run only on repos with those languages +- Language detection is based on GitHub's automatic language detection + +### Configuration + +Configuration file locations (in priority order): +1. `./github_linter.json` (local directory) +2. `~/.config/github_linter.json` (user config) + +Each module can define `DEFAULT_CONFIG` which gets merged with user configuration. + +#### Branch Protection Configuration + +The `branch_protection` module supports both legacy branch protection rules and modern GitHub rulesets. It can check existing protection, create new protection, and migrate from legacy rules to rulesets. + +```json +{ + "branch_protection": { + "enable_protection": true, + "allow_admin_bypass": true, + "require_pull_request": true, + "required_approving_review_count": 1, + "dismiss_stale_reviews": true, + "require_code_owner_review": false, + "use_rulesets": true, + "migrate_to_rulesets": false, + "warn_on_mismatch": true, + "language_checks": { + "Python": ["pytest", "ruff", "mypy"], + "Rust": ["cargo-test", "clippy"], + "JavaScript": ["test", "lint"], + "TypeScript": ["test", "lint"], + "Shell": ["shellcheck"], + "Go": ["test", "lint"] + } + } +} +``` + +**Configuration Options:** +- `enable_protection` - Whether to enable branch protection checks (default: true) +- `allow_admin_bypass` - Allow repository admins to bypass protection requirements (default: true). For legacy protection, this sets `enforce_admins=false`. For rulesets, this adds repository admin role (ID 5) to `bypass_actors`. +- `require_pull_request` - Require pull request before merging (default: true) +- `required_approving_review_count` - Number of required PR approvals (default: 1) +- `dismiss_stale_reviews` - Dismiss stale reviews when new commits are pushed (default: true) +- `require_code_owner_review` - Require review from code owners (default: false) +- `use_rulesets` - Prefer GitHub rulesets over legacy branch protection (default: true) +- `migrate_to_rulesets` - Automatically migrate from legacy protection to rulesets when fixing (default: false) +- `warn_on_mismatch` - If protection exists but doesn't match config, warn instead of error (default: true) +- `language_checks` - Map of GitHub language names to required status check names. The module automatically determines which checks to require based on detected repository languages. + +**Legacy vs Rulesets:** +- Legacy branch protection: Traditional branch protection API (one rule per branch) +- Rulesets: Modern GitHub protection (multiple rulesets aggregate, more features) +- The module detects which system is in use and can work with both +- With `use_rulesets: true`, new protection is created as rulesets +- With `migrate_to_rulesets: true`, the fix function will convert legacy protection to rulesets +- Both systems can coexist; the module checks both and reports on mismatches + +**Implementation Notes:** +- Uses PyGithub's `_requester` API for rulesets since PyGithub doesn't natively support them yet +- Rulesets API requires GitHub API version 2022-11-28 +- Rulesets provide more granular control and organization-wide enforcement + +### Exception Handling + +The codebase uses custom exceptions for flow control: +- `SkipOnArchived` - Skip check for archived repositories +- `SkipOnPrivate` - Skip check for private repositories +- `SkipOnPublic` - Skip check for public repositories +- `SkipOnProtected` - Skip check when default branch is protected +- `SkipNoLanguage` - Skip check when required language not present +- `NoChangeNeeded` - Indicates no action needed + +These exceptions are caught in `RepoLinter.run_module()` and suppress the check/fix function gracefully. + +## Adding New Test Modules + +1. Create a new module under `github_linter/tests/` +2. Define required module-level attributes: + - `CATEGORY: str` - Display name for reports + - `LANGUAGES: List[str]` - Languages this module applies to (or `["all"]`) + - `DEFAULT_CONFIG: Dict[str, Any]` - Default configuration +3. Implement check functions: `def check_(repo: RepoLinter) -> None` +4. Implement fix functions: `def fix_(repo: RepoLinter) -> None` +5. Import the module in `github_linter/tests/__init__.py` +6. Use `repo.error()`, `repo.warning()`, or `repo.fix()` to report results + +## Testing Strategy + +- Unit tests are in `/tests/` directory +- Tests requiring network/authentication are marked with `@pytest.mark.network` +- Run tests without network: `uv run pytest -m "not network"` +- The codebase uses both PyGithub and github3.py libraries + +## Code Style + +- Line length: 200 characters (configured in pyproject.toml) +- Type checking: strict mypy mode +- Linting: ruff with pylint-pydantic plugin +- Use pydantic for validation where appropriate +- Use loguru for logging + +## Dependencies + +- Main GitHub libraries: `pygithub`, `github3-py` +- Web framework: `fastapi`, `uvicorn` +- Configuration: `json5`, `pyyaml`, `tomli`, `python-hcl2` +- Type checking: `mypy` with strict mode +- Testing: `pytest` diff --git a/github_linter/__main__.py b/github_linter/__main__.py index 8e97386..468b461 100644 --- a/github_linter/__main__.py +++ b/github_linter/__main__.py @@ -5,9 +5,9 @@ import click from loguru import logger -from . import GithubLinter, search_repos -from .utils import setup_logging -from .tests import MODULES, load_modules +from github_linter import GithubLinter, search_repos +from github_linter.utils import setup_logging +from github_linter.tests import MODULES, load_modules MODULE_CHOICES = [key for key in list(MODULES.keys()) if not key.startswith("github_linter")] diff --git a/github_linter/types.py b/github_linter/custom_types.py similarity index 100% rename from github_linter/types.py rename to github_linter/custom_types.py diff --git a/github_linter/repolinter.py b/github_linter/repolinter.py index 93e0537..d720ec4 100644 --- a/github_linter/repolinter.py +++ b/github_linter/repolinter.py @@ -25,7 +25,7 @@ SkipOnPublic, ) -from .types import DICTLIST +from .custom_types import DICTLIST from .utils import load_config diff --git a/github_linter/tests/__init__.py b/github_linter/tests/__init__.py index c3cd9c5..28608b6 100644 --- a/github_linter/tests/__init__.py +++ b/github_linter/tests/__init__.py @@ -8,6 +8,7 @@ from . import ( + branch_protection, # noqa: F401 codeowners, # noqa: F401 dependabot, # noqa: F401 docs, # noqa: F401 diff --git a/github_linter/tests/branch_protection.py b/github_linter/tests/branch_protection.py new file mode 100644 index 0000000..90442b3 --- /dev/null +++ b/github_linter/tests/branch_protection.py @@ -0,0 +1,966 @@ +"""Branch protection checks and fixes using both legacy rules and modern rulesets""" + +from typing import Any, Dict, List, Optional, Set + +from github.BranchProtection import BranchProtection +from github.GithubException import GithubException, UnknownObjectException +from loguru import logger +from pydantic import BaseModel +from ruyaml import YAML + +from ..repolinter import RepoLinter + +CATEGORY = "branch_protection" +LANGUAGES = ["all"] + + +class DefaultConfig(BaseModel): + """Configuration for branch protection module""" + + enable_protection: bool = True # Whether to enable protection + allow_admin_bypass: bool = True # Allow admin bypass + require_pull_request: bool = True # Require PR before merging + required_approving_review_count: int = 0 # Number of required approvals + dismiss_stale_reviews: bool = True # Dismiss stale reviews on push + require_code_owner_review: bool = False # Require code owner review + use_rulesets: bool = True # Prefer rulesets over legacy branch protection + migrate_to_rulesets: bool = True # Migrate legacy protection to rulesets + + # Mapping of language to required status check names + language_checks: Dict[str, List[str]] = { + "Python": ["pylint", "pytest", "mypy"], + "Docker": ["build_container"], + # "Rust": ["cargo-test", "clippy"], + # "JavaScript": ["test", "lint"], + # "TypeScript": ["test", "lint"], + # "Shell": ["shellcheck"], + # "Go": ["test", "lint"], + } + + # Whether to warn (instead of error) if protection exists but doesn't match + warn_on_mismatch: bool = True + + +DEFAULT_CONFIG = DefaultConfig() + + +def _get_required_checks_for_repo(repo: RepoLinter, config: Dict[str, Any]) -> List[str]: + """ + Get the list of required status checks based on repository languages. + + Args: + repo: RepoLinter instance + config: Module configuration + + Returns: + List of required status check names + """ + required_checks: List[str] = [] + repo_languages = list(repo.repository.get_languages().keys()) + + language_checks = config.get("language_checks", {}) + + for language in repo_languages: + if language in language_checks: + checks = language_checks[language] + for check in checks: + if check not in required_checks: + required_checks.append(check) + + logger.debug( + "Required checks for {} (languages: {}): {}", + repo.repository.full_name, + ", ".join(repo_languages), + ", ".join(required_checks) if required_checks else "none", + ) + + return required_checks + + +def _get_available_checks_for_repo(repo: RepoLinter) -> Set[str]: + """ + Get the list of available status checks by parsing workflow files. + + This function attempts to discover actual check names that would run on pull requests + by parsing GitHub Actions workflow files in .github/workflows/ directory. + + Args: + repo: RepoLinter instance + + Returns: + Set of available check names (job names from workflows) + """ + available_checks: Set[str] = set() + + try: + # Get all workflow files from .github/workflows/ + workflow_path = ".github/workflows" + contents = repo.repository.get_contents(workflow_path) + + # Handle single file or list of files + if not isinstance(contents, list): + contents = [contents] + + for content_file in contents: + # Only process YAML files + if not content_file.name.endswith(('.yml', '.yaml')): + continue + + try: + # Parse the workflow file + file_content = content_file.decoded_content.decode('utf-8') + yaml_parser = YAML(pure=True) + workflow_data = yaml_parser.load(file_content) + + if not isinstance(workflow_data, dict): + logger.debug("Workflow file {} has invalid structure", content_file.name) + continue + + # Extract job names from the 'jobs' section + jobs = workflow_data.get('jobs', {}) + if isinstance(jobs, dict): + for job_name in jobs.keys(): + available_checks.add(job_name) + logger.debug("Found check '{}' in workflow {}", job_name, content_file.name) + + except Exception as exc: + logger.debug("Failed to parse workflow file {}: {}", content_file.name, exc) + continue + + logger.debug( + "Available checks for {}: {}", + repo.repository.full_name, + ", ".join(sorted(available_checks)) if available_checks else "none", + ) + + except UnknownObjectException: + logger.debug("No .github/workflows directory found for {}", repo.repository.full_name) + except GithubException as exc: + logger.debug("Error accessing workflows for {}: {}", repo.repository.full_name, exc) + except Exception as exc: + logger.error("Unexpected error getting available checks for {}: {}", repo.repository.full_name, exc) + + return available_checks + + +def _validate_required_checks( + repo: RepoLinter, + required_checks: List[str], + available_checks: Set[str], +) -> None: + """ + Validate that required status checks actually exist in the repository. + + Compares the configured required checks against the available checks discovered + from workflow files. If there are mismatches, generates a warning with suggestions. + + Args: + repo: RepoLinter instance + required_checks: List of check names required by configuration + available_checks: Set of check names found in workflow files + """ + if not required_checks: + # No required checks, nothing to validate + return + + # Find checks that are required but not available + missing_checks = set(required_checks) - available_checks + + if missing_checks: + # Generate a helpful warning message + warning_parts = [ + f"Required status checks not found in workflow files: {', '.join(sorted(missing_checks))}" + ] + + # Suggest available checks if any exist + if available_checks: + warning_parts.append(f"Available checks in workflows: {', '.join(sorted(available_checks))}") + else: + warning_parts.append("No workflow files found in .github/workflows/") + + warning_parts.append( + "These checks will be required for branch protection but may never pass if workflows don't exist. " + "Update the 'language_checks' configuration or create matching workflow jobs." + ) + + repo.warning( + CATEGORY, + " | ".join(warning_parts), + ) + + logger.debug( + "Validation failed for {}: required={}, available={}, missing={}", + repo.repository.full_name, + sorted(required_checks), + sorted(available_checks), + sorted(missing_checks), + ) + + +def _get_rulesets(repo: RepoLinter) -> List[Dict[str, Any]]: + """ + Get repository rulesets using PyGithub's internal _requester. + + Fetches full ruleset details by first getting the list, then fetching + each ruleset individually to get complete rule and condition information. + + Args: + repo: RepoLinter instance + + Returns: + List of rulesets with full details (empty if none exist or API call fails) + """ + try: + # First, get the list of rulesets (summary view) + list_url = f"{repo.repository.url}/rulesets" + headers = { + "Accept": "application/vnd.github+json", + "X-GitHub-Api-Version": "2022-11-28", + } + _, response = repo.repository._requester.requestJsonAndCheck("GET", list_url, headers=headers) + ruleset_summaries = response if isinstance(response, list) else [] + + logger.debug( + "Fetched {} ruleset summaries for {}", + len(ruleset_summaries), + repo.repository.full_name, + ) + + # Now fetch full details for each ruleset + full_rulesets: List[Dict[str, Any]] = [] + for summary in ruleset_summaries: + ruleset_id = summary.get("id") + if not ruleset_id: + logger.warning("Ruleset missing ID, skipping: {}", summary) + continue + + try: + detail_url = f"{repo.repository.url}/rulesets/{ruleset_id}" + _, detail_response = repo.repository._requester.requestJsonAndCheck("GET", detail_url, headers=headers) + if isinstance(detail_response, dict): + full_rulesets.append(detail_response) + logger.debug( + "Fetched full details for ruleset '{}' (id={}): has {} rules, conditions={}", + detail_response.get("name"), + ruleset_id, + len(detail_response.get("rules", [])) if detail_response.get("rules") else 0, + "present" if detail_response.get("conditions") else "None", + ) + except GithubException as exc: + logger.error("Error fetching ruleset {}: {}", ruleset_id, exc) + continue + + return full_rulesets + except GithubException as exc: + if exc.status == 404: + logger.debug("No rulesets found for {}", repo.repository.full_name) + return [] + else: + logger.error("Error fetching rulesets for {}: {}", repo.repository.full_name, exc) + return [] + except Exception as exc: + logger.error("Unexpected error fetching rulesets: {}", exc) + return [] + + +def _get_admin_bypass_actors() -> List[Dict[str, Any]]: + """ + Get bypass actors list to allow repository admins to bypass rulesets. + + Returns: + List with repository admin role configured as bypass actor + """ + return [ + { + "actor_id": 5, # Repository admin role ID + "actor_type": "RepositoryRole", + "bypass_mode": "always", + } + ] + + +def _create_ruleset( + repo: RepoLinter, + name: str, + require_pr: bool, + required_approving_review_count: int, + dismiss_stale_reviews: bool, + require_code_owner_review: bool, + required_checks: List[str], + bypass_actors: Optional[List[Dict[str, Any]]] = None, +) -> Optional[Dict[str, Any]]: + """ + Create a repository ruleset using PyGithub's internal _requester. + + Args: + repo: RepoLinter instance + name: Ruleset name + require_pr: Whether to require pull requests + required_approving_review_count: Number of required approvals + dismiss_stale_reviews: Whether to dismiss stale reviews on push + require_code_owner_review: Whether to require code owner review + required_checks: List of required status check names + bypass_actors: List of actors who can bypass the ruleset + + Returns: + Created ruleset data, or None if creation failed + """ + rules: List[Dict[str, Any]] = [] + + # Add pull request rule if requested + if require_pr: + pr_rule: Dict[str, Any] = { + "type": "pull_request", + "parameters": { + "required_approving_review_count": required_approving_review_count, + "dismiss_stale_reviews_on_push": dismiss_stale_reviews, + "require_code_owner_review": require_code_owner_review, + "require_last_push_approval": False, + "required_review_thread_resolution": False, + }, + } + rules.append(pr_rule) + + # Add status checks rule if there are any checks + if required_checks: + status_check_rule: Dict[str, Any] = { + "type": "required_status_checks", + "parameters": { + "required_status_checks": [{"context": check} for check in required_checks], + "strict_required_status_checks_policy": False, + }, + } + rules.append(status_check_rule) + + # Build the ruleset payload + ruleset_data = { + "name": name, + "target": "branch", + "enforcement": "active", + "conditions": {"ref_name": {"include": [f"refs/heads/{repo.repository.default_branch}"], "exclude": []}}, + "rules": rules, + "bypass_actors": bypass_actors or [], + } + + try: + url = f"{repo.repository.url}/rulesets" + headers = { + "Accept": "application/vnd.github+json", + "X-GitHub-Api-Version": "2022-11-28", + } + _, response = repo.repository._requester.requestJsonAndCheck( + "POST", + url, + headers=headers, + input=ruleset_data, + ) + return response if isinstance(response, dict) else None + except GithubException as exc: + logger.error( + "Failed to create ruleset for {}: {}", + repo.repository.full_name, + exc.data.get("message", str(exc)) if hasattr(exc, "data") and isinstance(exc.data, dict) else str(exc), + ) + return None + except Exception as exc: + logger.error("Unexpected error creating ruleset: {}", exc) + return None + + +def _get_branch_protection(repo: RepoLinter) -> Optional[BranchProtection]: + """ + Get branch protection for the default branch. + + Args: + repo: RepoLinter instance + + Returns: + BranchProtection object if protection exists, None otherwise + """ + try: + branch = repo.repository.get_branch(repo.repository.default_branch) + protection = branch.get_protection() + return protection + except GithubException as exc: + if exc.status == 404: + logger.debug( + "No branch protection found for {} on branch {}", + repo.repository.full_name, + repo.repository.default_branch, + ) + return None + else: + logger.error( + "Error getting branch protection for {}: {}", + repo.repository.full_name, + exc, + ) + raise + + +def _delete_branch_protection(repo: RepoLinter) -> bool: + """ + Delete legacy branch protection for the default branch. + + Args: + repo: RepoLinter instance + + Returns: + True if deletion was successful, False otherwise + """ + try: + branch = repo.repository.get_branch(repo.repository.default_branch) + branch.remove_protection() + logger.info( + "Deleted legacy branch protection for {} on branch {}", + repo.repository.full_name, + repo.repository.default_branch, + ) + return True + except GithubException as exc: + if exc.status == 404: + logger.debug( + "No branch protection to delete for {} on branch {}", + repo.repository.full_name, + repo.repository.default_branch, + ) + return True # Already gone, consider it successful + else: + logger.error( + "Error deleting branch protection for {}: {}", + repo.repository.full_name, + exc, + ) + return False + + +def _check_protection_matches_config( + protection: BranchProtection, + enforce_admins: bool, + require_pr: bool, + required_approving_review_count: int, + required_checks: List[str], +) -> tuple[bool, List[str]]: + """ + Check if existing protection matches desired configuration. + + Args: + protection: Existing BranchProtection object + enforce_admins: Whether admin bypass should be enabled + require_pr: Whether PR is required + required_approving_review_count: Number of required approvals + required_checks: List of required status check names + + Returns: + Tuple of (matches, list of differences) + """ + differences: List[str] = [] + + # Check enforce_admins setting + # Note: protection.enforce_admins may be a boolean or an object with enabled attribute + if hasattr(protection.enforce_admins, "enabled"): + current_enforce_admins = getattr(protection.enforce_admins, "enabled", False) + else: + current_enforce_admins = bool(protection.enforce_admins) + + if current_enforce_admins != enforce_admins: + differences.append(f"enforce_admins: current={current_enforce_admins}, expected={enforce_admins}") + + # Check pull request requirements + if require_pr: + if not protection.required_pull_request_reviews: + differences.append("pull request reviews not required") + else: + current_approvals = protection.required_pull_request_reviews.required_approving_review_count + if current_approvals != required_approving_review_count: + differences.append(f"required approvals: current={current_approvals}, expected={required_approving_review_count}") + + # Check required status checks + if protection.required_status_checks: + current_checks = [] + if hasattr(protection.required_status_checks, "checks") and protection.required_status_checks.checks: + # Newer API format + current_checks = [check.context for check in protection.required_status_checks.checks] + elif hasattr(protection.required_status_checks, "contexts") and protection.required_status_checks.contexts: + # Legacy API format + current_checks = list(protection.required_status_checks.contexts) + + # Compare sets to ignore order + if set(current_checks) != set(required_checks): + missing = set(required_checks) - set(current_checks) + extra = set(current_checks) - set(required_checks) + if missing: + differences.append(f"missing required checks: {', '.join(missing)}") + if extra: + differences.append(f"extra checks not in config: {', '.join(extra)}") + elif required_checks: + differences.append(f"no status checks configured, expected: {', '.join(required_checks)}") + + return len(differences) == 0, differences + + +def _check_ruleset_matches_config( + ruleset: Dict[str, Any], + require_pr: bool, + required_approving_review_count: int, + required_checks: List[str], + allow_admin_bypass: bool, +) -> tuple[bool, List[str]]: + """ + Check if existing ruleset matches desired configuration. + + Args: + ruleset: Ruleset data from API + require_pr: Whether PR is required + required_approving_review_count: Number of required approvals + required_checks: List of required status check names + allow_admin_bypass: Whether repository admins should be able to bypass + + Returns: + Tuple of (matches, list of differences) + """ + differences: List[str] = [] + + rules = ruleset.get("rules", []) + + # Check for PR rule + pr_rule = next((r for r in rules if r.get("type") == "pull_request"), None) + if require_pr: + if not pr_rule: + differences.append("pull request rule not found") + else: + params = pr_rule.get("parameters", {}) + current_approvals = params.get("required_approving_review_count", 0) + if current_approvals != required_approving_review_count: + differences.append(f"required approvals: current={current_approvals}, expected={required_approving_review_count}") + + # Check for status checks rule + status_rule = next((r for r in rules if r.get("type") == "required_status_checks"), None) + if required_checks: + if not status_rule: + differences.append("required status checks rule not found") + else: + params = status_rule.get("parameters", {}) + current_checks_data = params.get("required_status_checks", []) + current_checks = [check.get("context", "") for check in current_checks_data] + + if set(current_checks) != set(required_checks): + missing = set(required_checks) - set(current_checks) + extra = set(current_checks) - set(required_checks) + if missing: + differences.append(f"missing required checks: {', '.join(missing)}") + if extra: + differences.append(f"extra checks not in config: {', '.join(extra)}") + + # Check bypass actors for admin bypass + bypass_actors = ruleset.get("bypass_actors", []) + has_admin_bypass = any(actor.get("actor_type") == "RepositoryRole" and actor.get("actor_id") == 5 for actor in bypass_actors) + if allow_admin_bypass and not has_admin_bypass: + differences.append("admin bypass not configured (repository admin role not in bypass_actors)") + elif not allow_admin_bypass and has_admin_bypass: + differences.append("admin bypass enabled but config has allow_admin_bypass=False") + + return len(differences) == 0, differences + + +def check_default_branch_protection(repo: RepoLinter) -> None: + """ + Check if branch protection is enabled on the default branch. + + Checks both legacy branch protection and modern rulesets. + + Verifies: + - Branch protection or rulesets exist + - Admin bypass is configured correctly (legacy only) + - Pull request requirements + - Required status checks match repository languages + """ + repo.skip_on_archived() + + config = repo.config.get("branch_protection", {}) + if not config.get("enable_protection", True): + logger.debug("Branch protection checks disabled in config for {}", repo.repository.full_name) + return + + use_rulesets = config.get("use_rulesets", True) + require_pr = config.get("require_pull_request", True) + required_approving_review_count = config.get("required_approving_review_count", 1) + # Note: enforce_admins=False means admins CAN bypass, enforce_admins=True means they CANNOT + # So we invert allow_admin_bypass to get the correct enforce_admins value + allow_admin_bypass = config.get("allow_admin_bypass", True) + enforce_admins = not allow_admin_bypass + required_checks = _get_required_checks_for_repo(repo, config) + + # Validate that required checks exist in workflow files + available_checks = _get_available_checks_for_repo(repo) + _validate_required_checks(repo, required_checks, available_checks) + + # Check for rulesets first if enabled + rulesets = _get_rulesets(repo) if use_rulesets else [] + protection = _get_branch_protection(repo) + + if not rulesets and protection is None: + repo.error( + CATEGORY, + f"No protection configured on default branch '{repo.repository.default_branch}' (neither rulesets nor legacy branch protection)", + ) + return + + # If we have both, check rulesets first + if rulesets: + # Find ruleset targeting the default branch + # Note: conditions can be None in the list API response, in which case + # we need to assume the ruleset applies to all branches (including default) + default_branch_rulesets = [ + rs + for rs in rulesets + if rs.get("target") == "branch" + and ( + rs.get("conditions") is None # No conditions = applies to all branches + or any( + f"refs/heads/{repo.repository.default_branch}" in incl or repo.repository.default_branch in incl + for incl in (rs.get("conditions") or {}).get("ref_name", {}).get("include", []) + ) + ) + ] + + logger.debug( + "Filtered rulesets for default branch '{}': found {} out of {} total rulesets", + repo.repository.default_branch, + len(default_branch_rulesets), + len(rulesets), + ) + + if default_branch_rulesets: + for ruleset in default_branch_rulesets: + matches, differences = _check_ruleset_matches_config( + ruleset, + require_pr, + required_approving_review_count, + required_checks, + allow_admin_bypass, + ) + + if not matches: + warn_on_mismatch = config.get("warn_on_mismatch", True) + message = f"Ruleset '{ruleset.get('name')}' on '{repo.repository.default_branch}' doesn't match config: {'; '.join(differences)}" + + if warn_on_mismatch: + repo.warning(CATEGORY, message) + else: + repo.error(CATEGORY, message) + + # Also check legacy branch protection if it exists + if protection is not None: + migrate_to_rulesets = config.get("migrate_to_rulesets", False) + + # Warn about using legacy protection if rulesets are preferred and no rulesets exist + if use_rulesets and not rulesets: + if migrate_to_rulesets: + repo.warning( + CATEGORY, + f"Repository uses legacy branch protection on '{repo.repository.default_branch}' - migration enabled, run with --fix to migrate to rulesets", + ) + else: + repo.warning( + CATEGORY, + f"Repository uses legacy branch protection on '{repo.repository.default_branch}' - consider enabling migration with 'migrate_to_rulesets: true' in config", + ) + + matches, differences = _check_protection_matches_config( + protection, + enforce_admins, + require_pr, + required_approving_review_count, + required_checks, + ) + + if not matches: + warn_on_mismatch = config.get("warn_on_mismatch", True) + message = f"Legacy branch protection on '{repo.repository.default_branch}' doesn't match config: {'; '.join(differences)}" + + if warn_on_mismatch: + repo.warning(CATEGORY, message) + else: + repo.error(CATEGORY, message) + + +def check_legacy_protection_cleanup(repo: RepoLinter) -> None: + """ + Check if legacy branch protection should be removed when rulesets exist. + + When migrate_to_rulesets is enabled and rulesets are already in place, + this check warns if legacy branch protection still exists and should be cleaned up. + """ + repo.skip_on_archived() + + config = repo.config.get("branch_protection", {}) + if not config.get("enable_protection", True): + return + + migrate_to_rulesets = config.get("migrate_to_rulesets", False) + use_rulesets = config.get("use_rulesets", True) + + # Only check if migration is enabled + if not migrate_to_rulesets or not use_rulesets: + return + + rulesets = _get_rulesets(repo) + protection = _get_branch_protection(repo) + + # If we have rulesets AND legacy protection, warn that legacy should be removed + if rulesets and protection is not None: + # Find rulesets targeting the default branch + # Note: conditions can be None in the list API response, in which case + # we need to assume the ruleset applies to all branches (including default) + default_branch_rulesets = [ + rs + for rs in rulesets + if rs.get("target") == "branch" + and ( + rs.get("conditions") is None # No conditions = applies to all branches + or any( + f"refs/heads/{repo.repository.default_branch}" in incl or repo.repository.default_branch in incl + for incl in (rs.get("conditions") or {}).get("ref_name", {}).get("include", []) + ) + ) + ] + + logger.debug( + "check_legacy_protection_cleanup: Filtered rulesets for default branch '{}': found {} out of {} total rulesets", + repo.repository.default_branch, + len(default_branch_rulesets), + len(rulesets), + ) + + if default_branch_rulesets: + repo.warning( + CATEGORY, + f"Legacy branch protection still exists on '{repo.repository.default_branch}' alongside rulesets - run with --fix to remove legacy protection", + ) + + +def fix_default_branch_protection(repo: RepoLinter) -> None: + """ + Enable branch protection on the default branch if not already enabled. + + Configures: + - Admin bypass (allow admins to bypass protection) - legacy only + - Pull request requirements + - Required status checks based on repository languages + + Prefers rulesets over legacy branch protection when use_rulesets is enabled. + Can migrate from legacy protection to rulesets if migrate_to_rulesets is enabled. + + Note: This will NOT modify existing protection rules per the warn_on_mismatch setting. + """ + repo.skip_on_archived() + + config = repo.config.get("branch_protection", {}) + if not config.get("enable_protection", True): + logger.debug("Branch protection disabled in config for {}", repo.repository.full_name) + return + + use_rulesets = config.get("use_rulesets", True) + migrate_to_rulesets = config.get("migrate_to_rulesets", False) + require_pr = config.get("require_pull_request", True) + required_approving_review_count = config.get("required_approving_review_count", 1) + dismiss_stale_reviews = config.get("dismiss_stale_reviews", True) + require_code_owner_review = config.get("require_code_owner_review", False) + # Note: enforce_admins=False means admins CAN bypass, enforce_admins=True means they CANNOT + # So we invert allow_admin_bypass to get the correct enforce_admins value + allow_admin_bypass = config.get("allow_admin_bypass", True) + enforce_admins = not allow_admin_bypass + required_checks = _get_required_checks_for_repo(repo, config) + + # Validate that required checks exist in workflow files + available_checks = _get_available_checks_for_repo(repo) + _validate_required_checks(repo, required_checks, available_checks) + + rulesets = _get_rulesets(repo) if use_rulesets else [] + protection = _get_branch_protection(repo) + + # If migration is requested and we have legacy protection but no rulesets + if migrate_to_rulesets and protection is not None and not rulesets and use_rulesets: + logger.info("Migrating {} from legacy branch protection to rulesets", repo.repository.full_name) + bypass_actors = _get_admin_bypass_actors() if allow_admin_bypass else None + result = _create_ruleset( + repo, + f"Protect {repo.repository.default_branch}", + require_pr, + required_approving_review_count, + dismiss_stale_reviews, + require_code_owner_review, + required_checks, + bypass_actors, + ) + if result: + # Delete the legacy branch protection after successful ruleset creation + if _delete_branch_protection(repo): + repo.fix( + CATEGORY, + f"Migrated '{repo.repository.default_branch}' from legacy branch protection to ruleset '{result.get('name')}' and removed legacy protection", + ) + else: + repo.fix( + CATEGORY, + f"Created ruleset '{result.get('name')}' for '{repo.repository.default_branch}' but failed to remove legacy protection (manual cleanup required)", + ) + return + else: + repo.error(CATEGORY, f"Failed to migrate '{repo.repository.default_branch}' to rulesets") + return + + # If we already have rulesets or protection, skip + if rulesets or protection is not None: + logger.debug( + "Protection already exists for {} on branch '{}', skipping fix", + repo.repository.full_name, + repo.repository.default_branch, + ) + return + + # No protection exists, create it + if use_rulesets: + # Create a ruleset + bypass_actors = _get_admin_bypass_actors() if allow_admin_bypass else None + result = _create_ruleset( + repo, + f"Protect {repo.repository.default_branch}", + require_pr, + required_approving_review_count, + dismiss_stale_reviews, + require_code_owner_review, + required_checks, + bypass_actors, + ) + if result: + rules_desc = [] + if require_pr: + rules_desc.append(f"require {required_approving_review_count} approval(s)") + if required_checks: + rules_desc.append(f"required checks: {', '.join(required_checks)}") + + repo.fix( + CATEGORY, + f"Created ruleset '{result.get('name')}' for '{repo.repository.default_branch}' with {'; '.join(rules_desc) if rules_desc else 'basic protection'}", + ) + else: + repo.error(CATEGORY, f"Failed to create ruleset for '{repo.repository.default_branch}'") + else: + # Use legacy branch protection + try: + branch = repo.repository.get_branch(repo.repository.default_branch) + + # Build parameters for edit_protection + protection_params: Dict[str, Any] = { + "enforce_admins": enforce_admins, + } + + # Add PR requirements if requested + if require_pr: + protection_params["required_approving_review_count"] = required_approving_review_count + protection_params["dismiss_stale_reviews"] = dismiss_stale_reviews + protection_params["require_code_owner_reviews"] = require_code_owner_review + + # Add status checks if any + if required_checks: + protection_params["strict"] = False + protection_params["checks"] = list(required_checks) + + branch.edit_protection(**protection_params) + + rules_desc = [] + if enforce_admins: + rules_desc.append("admin bypass") + if require_pr: + rules_desc.append(f"require {required_approving_review_count} approval(s)") + if required_checks: + rules_desc.append(f"required checks: {', '.join(required_checks)}") + + repo.fix( + CATEGORY, + f"Enabled legacy branch protection on '{repo.repository.default_branch}' with {'; '.join(rules_desc) if rules_desc else 'basic protection'}", + ) + + except GithubException as exc: + logger.error( + "Failed to enable branch protection for {} on branch '{}': {}", + repo.repository.full_name, + repo.repository.default_branch, + exc, + ) + repo.error( + CATEGORY, + f"Failed to enable branch protection: {exc.data.get('message', str(exc)) if hasattr(exc, 'data') and isinstance(exc.data, dict) else str(exc)}", + ) + + +def fix_legacy_protection_cleanup(repo: RepoLinter) -> None: + """ + Remove legacy branch protection when rulesets exist. + + This fix function runs when migrate_to_rulesets is enabled and both + rulesets and legacy branch protection exist. It removes the legacy + protection to complete the migration. + """ + repo.skip_on_archived() + + config = repo.config.get("branch_protection", {}) + if not config.get("enable_protection", True): + return + + migrate_to_rulesets = config.get("migrate_to_rulesets", False) + use_rulesets = config.get("use_rulesets", True) + + # Only fix if migration is enabled + if not migrate_to_rulesets or not use_rulesets: + return + + rulesets = _get_rulesets(repo) + protection = _get_branch_protection(repo) + + # If we have rulesets AND legacy protection, remove the legacy protection + if rulesets and protection is not None: + # Find rulesets targeting the default branch + # Note: conditions can be None in the list API response, in which case + # we need to assume the ruleset applies to all branches (including default) + default_branch_rulesets = [ + rs + for rs in rulesets + if rs.get("target") == "branch" + and ( + rs.get("conditions") is None # No conditions = applies to all branches + or any( + f"refs/heads/{repo.repository.default_branch}" in incl or repo.repository.default_branch in incl + for incl in (rs.get("conditions") or {}).get("ref_name", {}).get("include", []) + ) + ) + ] + + logger.debug( + "fix_legacy_protection_cleanup: Filtered rulesets for default branch '{}': found {} out of {} total rulesets", + repo.repository.default_branch, + len(default_branch_rulesets), + len(rulesets), + ) + + if not default_branch_rulesets: + logger.warning( + "fix_legacy_protection_cleanup: No rulesets match default branch '{}'. Skipping legacy protection cleanup. " + "This might indicate a bug in the filter logic.", + repo.repository.default_branch, + ) + return + + if default_branch_rulesets: + if _delete_branch_protection(repo): + repo.fix( + CATEGORY, + f"Removed legacy branch protection from '{repo.repository.default_branch}' (rulesets already in place)", + ) + else: + repo.error( + CATEGORY, + f"Failed to remove legacy branch protection from '{repo.repository.default_branch}'", + ) diff --git a/github_linter/tests/dependabot/__init__.py b/github_linter/tests/dependabot/__init__.py index fd752c3..75c3c0a 100644 --- a/github_linter/tests/dependabot/__init__.py +++ b/github_linter/tests/dependabot/__init__.py @@ -74,9 +74,7 @@ def generate_expected_update_config( updates: List[DependabotUpdateConfig] = [] for language in repo.repository.get_languages(): if find_language_in_ecosystem(language): - logger.debug( - "Found lang/eco: {}, {}", language, find_language_in_ecosystem(language) - ) + logger.debug("Found lang/eco: {}, {}", language, find_language_in_ecosystem(language)) new_config = DependabotUpdateConfig.model_validate( { "package-ecosystem": find_language_in_ecosystem(language), @@ -183,22 +181,14 @@ def check_updates_for_languages(repo: RepoLinter) -> None: # check that the repo has full coverage if set(required_package_managers) != set(package_managers_covered): - missing_package_managers = [ - manager - for manager in required_package_managers - if manager not in package_managers_covered - ] + missing_package_managers = [manager for manager in required_package_managers if manager not in package_managers_covered] for manager in missing_package_managers: repo.error( CATEGORY, f"Package manager needs to be configured for {manager}", ) - extra_package_managers = [ - manager - for manager in package_managers_covered - if manager not in required_package_managers - ] + extra_package_managers = [manager for manager in package_managers_covered if manager not in required_package_managers] for extra_manager in extra_package_managers: repo.error( @@ -300,10 +290,7 @@ def check_dependabot_automerge_workflow(repo: RepoLinter) -> None: fileresult = repo.get_file(filepath) if fileresult is None or fileresult.decoded_content.decode("utf-8").strip() == "": return repo.error(CATEGORY, f"{filepath} missing") - if ( - fileresult.decoded_content.decode("utf-8") - != get_fix_file_path(category=CATEGORY, filename=filepath).read_text() - ): + if fileresult.decoded_content.decode("utf-8") != get_fix_file_path(category=CATEGORY, filename=filepath).read_text(): repo.warning(CATEGORY, f"Content differs for {filepath}") # show the diff between the two files # repo.diff_file( @@ -336,19 +323,14 @@ def fix_dependabot_automerge_workflow(repo: RepoLinter) -> None: message=f"Created {filepath}", ) return repo.fix(CATEGORY, f"Created {filepath}, commit url: {result}") - if ( - fileresult.decoded_content.decode("utf-8") - != get_fix_file_path(category=CATEGORY, filename=filepath).read_text() - ): + if fileresult.decoded_content.decode("utf-8") != get_fix_file_path(category=CATEGORY, filename=filepath).read_text(): result = repo.create_or_update_file( filepath=filepath, newfile=get_fix_file_path(category=CATEGORY, filename=filepath), oldfile=fileresult, message=f"Updated {filepath} to latest version", ) - return repo.fix( - CATEGORY, f"Updated {filepath} to latest version, commit url: {result}" - ) + return repo.fix(CATEGORY, f"Updated {filepath} to latest version, commit url: {result}") logger.debug("{} already exists and has the right contents!", filepath) return None @@ -378,10 +360,7 @@ def fix_create_dependabot_config(repo: RepoLinter) -> None: expected_config = generate_expected_update_config(repo) - updates = [ - val.dict(by_alias=True, exclude_unset=True, exclude_none=True) - for val in expected_config.updates - ] + updates = [val.dict(by_alias=True, exclude_unset=True, exclude_none=True) for val in expected_config.updates] update_dict = { "version": expected_config.version, "updates": updates, @@ -397,9 +376,7 @@ def fix_create_dependabot_config(repo: RepoLinter) -> None: newfilecontents = buf.read() logger.debug("New contents: \n{}", newfilecontents) # raise NotImplementedError - if newfilecontents != repo.cached_get_file( - repo.config[CATEGORY]["config_filename"], True - ): + if newfilecontents != repo.cached_get_file(repo.config[CATEGORY]["config_filename"], True): logger.debug("Updating file") try: result = repo.create_or_update_file( @@ -438,16 +415,12 @@ def check_repository_automerge(repo: RepoLinter) -> None: try: configured = res.json().get("allow_auto_merge") except JSONDecodeError as error: - repo.error( - CATEGORY, f"Failed to decode JSON while checking allow_auto_merge: {error}" - ) + repo.error(CATEGORY, f"Failed to decode JSON while checking allow_auto_merge: {error}") return None if configured is None: repo.error(CATEGORY, "None result in allow_auto_merge") if configured == expected_result: - logger.debug( - "allow_auto_merge matches expected setting, is set to {}", configured - ) + logger.debug("allow_auto_merge matches expected setting, is set to {}", configured) else: repo.error( CATEGORY, @@ -473,13 +446,8 @@ def fix_repository_automerge(repo: RepoLinter) -> None: # url = f"/repos/{repo.repository.full_name}" res = repo.repository3._patch(url=repo.repository3.url, json=request_body) logger.debug(res.json()) - if ( - res.status_code == 200 - and res.json().get("allow_auto_merge") == auto_merge_setting - ): - repo.fix( - CATEGORY, f"Updated repository auto-merge setting to {auto_merge_setting}" - ) + if res.status_code == 200 and res.json().get("allow_auto_merge") == auto_merge_setting: + repo.fix(CATEGORY, f"Updated repository auto-merge setting to {auto_merge_setting}") else: repo.error( CATEGORY, diff --git a/tests/test_branch_protection.py b/tests/test_branch_protection.py new file mode 100644 index 0000000..434bf3d --- /dev/null +++ b/tests/test_branch_protection.py @@ -0,0 +1,287 @@ +"""Tests for branch protection module""" + +from typing import List +from unittest.mock import Mock + +from github_linter.repolinter import RepoLinter +from github_linter.tests.branch_protection import ( + _get_available_checks_for_repo, + _validate_required_checks, +) + + +def create_mock_workflow_file(name: str, content: str) -> Mock: + """Create a mock workflow file ContentFile object""" + mock_file = Mock() + mock_file.name = name + mock_file.decoded_content = content.encode('utf-8') + return mock_file + + +def test_get_available_checks_with_simple_workflow() -> None: + """Test parsing a simple workflow file with jobs""" + + workflow_content = """ +name: Test Workflow +on: [push, pull_request] + +jobs: + pytest: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + mypy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + ruff: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 +""" + + # Create mock repository + mock_repo = Mock(spec=RepoLinter) + mock_repo.repository = Mock() + + # Mock get_contents to return our test workflow + workflow_files = [ + create_mock_workflow_file("test.yml", workflow_content) + ] + mock_repo.repository.get_contents.return_value = workflow_files + + # Call the function + available_checks = _get_available_checks_for_repo(mock_repo) + + # Verify results + assert "pytest" in available_checks + assert "mypy" in available_checks + assert "ruff" in available_checks + assert len(available_checks) == 3 + + +def test_get_available_checks_with_multiple_workflows() -> None: + """Test parsing multiple workflow files""" + + workflow1 = """ +name: Python Tests +jobs: + pytest: + runs-on: ubuntu-latest + steps: + - run: pytest +""" + + workflow2 = """ +name: Linting +jobs: + ruff: + runs-on: ubuntu-latest + steps: + - run: ruff check + + mypy: + runs-on: ubuntu-latest + steps: + - run: mypy +""" + + # Create mock repository + mock_repo = Mock(spec=RepoLinter) + mock_repo.repository = Mock() + + # Mock get_contents to return multiple workflow files + workflow_files = [ + create_mock_workflow_file("test.yml", workflow1), + create_mock_workflow_file("lint.yaml", workflow2), + ] + mock_repo.repository.get_contents.return_value = workflow_files + + # Call the function + available_checks = _get_available_checks_for_repo(mock_repo) + + # Verify results + assert "pytest" in available_checks + assert "mypy" in available_checks + assert "ruff" in available_checks + assert len(available_checks) == 3 + + +def test_get_available_checks_with_no_workflows() -> None: + """Test when .github/workflows directory doesn't exist""" + + # Create mock repository + mock_repo = Mock(spec=RepoLinter) + mock_repo.repository = Mock() + mock_repo.repository.full_name = "test/repo" + + # Mock get_contents to raise UnknownObjectException + from github.GithubException import UnknownObjectException + mock_repo.repository.get_contents.side_effect = UnknownObjectException( + status=404, + data={"message": "Not Found"}, + headers={} + ) + + # Call the function + available_checks = _get_available_checks_for_repo(mock_repo) + + # Should return empty set + assert len(available_checks) == 0 + + +def test_get_available_checks_with_invalid_yaml() -> None: + """Test handling of invalid YAML in workflow files""" + + invalid_yaml = """ +name: Invalid +jobs: + test: + this is not valid yaml: [[[ +""" + + valid_yaml = """ +name: Valid +jobs: + pytest: + runs-on: ubuntu-latest +""" + + # Create mock repository + mock_repo = Mock(spec=RepoLinter) + mock_repo.repository = Mock() + mock_repo.repository.full_name = "test/repo" + + # Mock get_contents with one invalid and one valid workflow + workflow_files = [ + create_mock_workflow_file("invalid.yml", invalid_yaml), + create_mock_workflow_file("valid.yml", valid_yaml), + ] + mock_repo.repository.get_contents.return_value = workflow_files + + # Call the function - should skip invalid file and process valid one + available_checks = _get_available_checks_for_repo(mock_repo) + + # Should only get checks from valid file + assert "pytest" in available_checks + assert len(available_checks) == 1 + + +def test_get_available_checks_ignores_non_yaml_files() -> None: + """Test that non-YAML files are ignored""" + + workflow_yaml = """ +name: Test +jobs: + pytest: + runs-on: ubuntu-latest +""" + + # Create mock repository + mock_repo = Mock(spec=RepoLinter) + mock_repo.repository = Mock() + + # Mock get_contents with YAML and non-YAML files + files = [ + create_mock_workflow_file("test.yml", workflow_yaml), + create_mock_workflow_file("README.md", "# Not a workflow"), + create_mock_workflow_file("script.sh", "#!/bin/bash\necho test"), + ] + mock_repo.repository.get_contents.return_value = files + + # Call the function + available_checks = _get_available_checks_for_repo(mock_repo) + + # Should only process .yml file + assert "pytest" in available_checks + assert len(available_checks) == 1 + + +def test_validate_required_checks_all_present() -> None: + """Test validation when all required checks are available""" + + # Create mock repository with no warnings expected + mock_repo = Mock(spec=RepoLinter) + mock_repo.repository = Mock() + mock_repo.repository.full_name = "test/repo" + + required_checks = ["pytest", "mypy", "ruff"] + available_checks = {"pytest", "mypy", "ruff", "extra-check"} + + # Call validation - should not raise any warnings + _validate_required_checks(mock_repo, required_checks, available_checks) + + # Verify no warnings were issued + mock_repo.warning.assert_not_called() + + +def test_validate_required_checks_some_missing() -> None: + """Test validation when some required checks are missing""" + + # Create mock repository + mock_repo = Mock(spec=RepoLinter) + mock_repo.repository = Mock() + mock_repo.repository.full_name = "test/repo" + + required_checks = ["pytest", "mypy", "pylint"] + available_checks = {"pytest", "ruff"} # mypy and pylint are missing + + # Call validation - should issue a warning + _validate_required_checks(mock_repo, required_checks, available_checks) + + # Verify warning was issued + mock_repo.warning.assert_called_once() + + # Check warning message contains missing checks + warning_call = mock_repo.warning.call_args + warning_message = warning_call[0][1] # Second argument is the message + + assert "mypy" in warning_message + assert "pylint" in warning_message + assert "Available checks in workflows" in warning_message + assert "pytest" in warning_message + assert "ruff" in warning_message + + +def test_validate_required_checks_none_available() -> None: + """Test validation when no workflows exist at all""" + + # Create mock repository + mock_repo = Mock(spec=RepoLinter) + mock_repo.repository = Mock() + mock_repo.repository.full_name = "test/repo" + + required_checks = ["pytest", "mypy"] + available_checks: set[str] = set() # No workflows found + + # Call validation - should issue a warning + _validate_required_checks(mock_repo, required_checks, available_checks) + + # Verify warning was issued + mock_repo.warning.assert_called_once() + + # Check warning message mentions no workflows + warning_call = mock_repo.warning.call_args + warning_message = warning_call[0][1] + + assert "pytest" in warning_message + assert "mypy" in warning_message + assert "No workflow files found" in warning_message + + +def test_validate_required_checks_empty_required() -> None: + """Test validation when no checks are required""" + + # Create mock repository + mock_repo = Mock(spec=RepoLinter) + + required_checks: List[str] = [] + available_checks: set[str] = {"pytest", "mypy"} + + # Call validation - should do nothing + _validate_required_checks(mock_repo, required_checks, available_checks) + + # Verify no warnings were issued + mock_repo.warning.assert_not_called()