From 16ebec63de890a4ebb6c7523aba82216664be7c9 Mon Sep 17 00:00:00 2001 From: jkosik Date: Sat, 30 Aug 2025 13:30:26 +0200 Subject: [PATCH 1/2] conditional tool/prompt deregistration --- chart/semgrep-mcp/values.yaml | 26 +- src/semgrep_mcp/server.py | 980 +++++++++++++++++----------------- 2 files changed, 524 insertions(+), 482 deletions(-) diff --git a/chart/semgrep-mcp/values.yaml b/chart/semgrep-mcp/values.yaml index 7353bff..80496d8 100644 --- a/chart/semgrep-mcp/values.yaml +++ b/chart/semgrep-mcp/values.yaml @@ -23,5 +23,27 @@ ingress: resources: {} env: [] -# - name: SEMGREP_APP_TOKEN -# value: "your-token-here" \ No newline at end of file + # - name: SEMGREP_APP_TOKEN + # value: "your-token-here" # consider mounting as a secret + - name: SEMGREP_URL + value: "your-semgrep-url-here" + - name: SEMGREP_RULE_SCHEMA_DISABLED + value: "true" + - name: SEMGREP_SUPPORTED_LANGUAGES_DISABLED + value: "true" + - name: SEMGREP_FINDINGS_DISABLED + value: "true" + - name: SEMGREP_SCAN_CUSTOM_RULE_DISABLED + value: "true" + - name: SEMGREP_SCAN_DISABLED + value: "true" + # - name: SEMGREP_SCAN_RPC_DISABLED + # value: "true" + - name: SEMGREP_SCAN_LOCAL_DISABLED + value: "true" + - name: SECURITY_CHECK_DISABLED + value: "true" + - name: SEMGREP_AST_DISABLED + value: "true" + - name: SEMGREP_RULE_PROMPT_DISABLED + value: "true" \ No newline at end of file diff --git a/src/semgrep_mcp/server.py b/src/semgrep_mcp/server.py index 6a46b2f..69691fe 100755 --- a/src/semgrep_mcp/server.py +++ b/src/semgrep_mcp/server.py @@ -352,45 +352,49 @@ async def server_lifespan(_server: FastMCP) -> AsyncIterator[SemgrepContext]: # --------------------------------------------------------------------------------- -@mcp.tool() -@with_tool_span() -async def semgrep_rule_schema(ctx: Context) -> str: - """ - Get the schema for a Semgrep rule - - Use this tool when you need to: - - get the schema required to write a Semgrep rule - - need to see what fields are available for a Semgrep rule - - verify what fields are available for a Semgrep rule - - verify the syntax for a Semgrep rule is correct - """ - try: - response = await http_client.get(f"{SEMGREP_API_URL}/schema_url") - response.raise_for_status() - data: dict[str, str] = response.json() - schema_url: str = data["schema_url"] - response = await http_client.get(schema_url) - response.raise_for_status() - return str(response.text) - except Exception as e: - raise McpError( - ErrorData(code=INTERNAL_ERROR, message=f"Error getting schema for Semgrep rule: {e!s}") - ) from e +# Conditionally register if SEMGREP_RULE_SCHEMA_DISABLED is not true +if os.environ.get("SEMGREP_RULE_SCHEMA_DISABLED", "").lower() != "true": + @mcp.tool() + @with_tool_span() + async def semgrep_rule_schema(ctx: Context) -> str: + """ + Get the schema for a Semgrep rule + + Use this tool when you need to: + - get the schema required to write a Semgrep rule + - need to see what fields are available for a Semgrep rule + - verify what fields are available for a Semgrep rule + - verify the syntax for a Semgrep rule is correct + """ + try: + response = await http_client.get(f"{SEMGREP_API_URL}/schema_url") + response.raise_for_status() + data: dict[str, str] = response.json() + schema_url: str = data["schema_url"] + response = await http_client.get(schema_url) + response.raise_for_status() + return str(response.text) + except Exception as e: + raise McpError( + ErrorData(code=INTERNAL_ERROR, message=f"Error getting schema for Semgrep rule: {e!s}") + ) from e -@mcp.tool() -@with_tool_span() -async def get_supported_languages(ctx: Context) -> list[str]: - """ - Returns a list of supported languages by Semgrep +# Conditionally register if GET_SUPPORTED_LANGUAGES_DISABLED is not true +if os.environ.get("GET_SUPPORTED_LANGUAGES_DISABLED", "").lower() != "true": + @mcp.tool() + @with_tool_span() + async def get_supported_languages(ctx: Context) -> list[str]: + """ + Returns a list of supported languages by Semgrep - Only use this tool if you are not sure what languages Semgrep supports. - """ - args = ["show", "supported-languages", "--experimental"] + Only use this tool if you are not sure what languages Semgrep supports. + """ + args = ["show", "supported-languages", "--experimental"] - # Parse output and return list of languages - languages = await run_semgrep_output(top_level_span=None, args=args) - return [lang.strip() for lang in languages.strip().split("\n") if lang.strip()] + # Parse output and return list of languages + languages = await run_semgrep_output(top_level_span=None, args=args) + return [lang.strip() for lang in languages.strip().split("\n") if lang.strip()] async def get_deployment_slug() -> str: @@ -465,372 +469,384 @@ async def get_deployment_slug() -> str: ) from e -@mcp.tool() -@with_tool_span() -async def semgrep_findings( - ctx: Context, - issue_type: list[str] = ["sast", "sca"], # noqa: B006 - repos: list[str] = None, # pyright: ignore # noqa: RUF013 - status: str = "open", - severities: list[str] = None, # pyright: ignore # noqa: RUF013 - confidence: list[str] = None, # pyright: ignore # noqa: RUF013 - autotriage_verdict: str = "true_positive", - page: int = 0, - page_size: int = 100, -) -> list[Finding]: - """ - Fetches findings from the Semgrep AppSec Platform Findings API. - - This function retrieves security, code quality, and supply chain findings that have already been - identified by previous Semgrep scans and uploaded to the Semgrep AppSec platform. It does NOT - perform a new scan or analyze code directly. Instead, it queries the Semgrep API to access - historical scan results for a given repository or set of repositories. - - DEFAULT BEHAVIOR: By default, this tool should filter by the current repository. The model - should determine the current repository name and pass it in the 'repos' parameter to ensure - findings are scoped to the relevant codebase. However, users may explicitly request findings - from other repositories, in which case the model should respect that request. - - Use this function when a prompt requests a summary, list, or analysis of existing findings, - such as: - - "Please list the top 10 security findings and propose solutions for them." - - "Show all open critical vulnerabilities in this repository." - - "Summarize the most recent Semgrep scan results." - - "Get findings from repository X" (explicitly requesting different repo) - - This function is ideal for: - - Reviewing, listing, or summarizing findings from past scans. - - Providing actionable insights or remediation advice based on existing scan data. - - Do NOT use this function to perform a new scan or check code that has not yet been analyzed by - Semgrep. For new scans, use the appropriate scanning function. - - Args: - issue_type (Optional[List[str]]): Filter findings by type. Use 'sast' for code analysis - findings and 'sca' for supply chain analysis findings (e.g., ['sast'], ['sca']). - status (Optional[str]): Filter findings by status (default: 'open'). - repos (Optional[List[str]]): List of repository names to filter results. By default, should - include the current repository name to scope findings appropriately. Can be overridden - when users explicitly request findings from other repositories. - severities (Optional[List[str]]): Filter findings by severity (e.g., ['critical', 'high']). - confidence (Optional[List[str]]): Filter findings by confidence level (e.g., ['high']). - autotriage_verdict (Optional[str]): Filter findings by auto-triage verdict - (default: 'true_positive'). - page (Optional[int]): Page number for paginated results. (default: 0) - page_size (int): Number of findings per page (default: 100, min: 100, max: 3000). - - Returns: - List[Finding]: A list of findings matching the specified filters, where each finding - contains details such as rule ID, description, severity, file location, and remediation - guidance if available. - """ - allowed_issue_types = {"sast", "sca"} - if not set(issue_type).issubset(allowed_issue_types): - invalid_types = ", ".join(set(issue_type) - allowed_issue_types) - raise McpError( - ErrorData( - code=INVALID_PARAMS, - message=f"Invalid issue_type(s): {invalid_types}. " - "Allowed values are 'sast' and 'sca'.", +# Conditionally register if SEMGREP_FINDINGS_DISABLED is not true +if os.environ.get("SEMGREP_FINDINGS_DISABLED", "").lower() != "true": + @mcp.tool() + @with_tool_span() + async def semgrep_findings( + ctx: Context, + issue_type: list[str] = ["sast", "sca"], # noqa: B006 + repos: list[str] = None, # pyright: ignore # noqa: RUF013 + status: str = "open", + severities: list[str] = None, # pyright: ignore # noqa: RUF013 + confidence: list[str] = None, # pyright: ignore # noqa: RUF013 + autotriage_verdict: str = "true_positive", + page: int = 0, + page_size: int = 100, + ) -> list[Finding]: + """ + Fetches findings from the Semgrep AppSec Platform Findings API. + + This function retrieves security, code quality, and supply chain findings that have already been + identified by previous Semgrep scans and uploaded to the Semgrep AppSec platform. It does NOT + perform a new scan or analyze code directly. Instead, it queries the Semgrep API to access + historical scan results for a given repository or set of repositories. + + DEFAULT BEHAVIOR: By default, this tool should filter by the current repository. The model + should determine the current repository name and pass it in the 'repos' parameter to ensure + findings are scoped to the relevant codebase. However, users may explicitly request findings + from other repositories, in which case the model should respect that request. + + Use this function when a prompt requests a summary, list, or analysis of existing findings, + such as: + - "Please list the top 10 security findings and propose solutions for them." + - "Show all open critical vulnerabilities in this repository." + - "Summarize the most recent Semgrep scan results." + - "Get findings from repository X" (explicitly requesting different repo) + + This function is ideal for: + - Reviewing, listing, or summarizing findings from past scans. + - Providing actionable insights or remediation advice based on existing scan data. + + Do NOT use this function to perform a new scan or check code that has not yet been analyzed by + Semgrep. For new scans, use the appropriate scanning function. + + Args: + issue_type (Optional[List[str]]): Filter findings by type. Use 'sast' for code analysis + findings and 'sca' for supply chain analysis findings (e.g., ['sast'], ['sca']). + status (Optional[str]): Filter findings by status (default: 'open'). + repos (Optional[List[str]]): List of repository names to filter results. By default, should + include the current repository name to scope findings appropriately. Can be overridden + when users explicitly request findings from other repositories. + severities (Optional[List[str]]): Filter findings by severity (e.g., ['critical', 'high']). + confidence (Optional[List[str]]): Filter findings by confidence level (e.g., ['high']). + autotriage_verdict (Optional[str]): Filter findings by auto-triage verdict + (default: 'true_positive'). + page (Optional[int]): Page number for paginated results. (default: 0) + page_size (int): Number of findings per page (default: 100, min: 100, max: 3000). + + Returns: + List[Finding]: A list of findings matching the specified filters, where each finding + contains details such as rule ID, description, severity, file location, and remediation + guidance if available. + """ + allowed_issue_types = {"sast", "sca"} + if not set(issue_type).issubset(allowed_issue_types): + invalid_types = ", ".join(set(issue_type) - allowed_issue_types) + raise McpError( + ErrorData( + code=INVALID_PARAMS, + message=f"Invalid issue_type(s): {invalid_types}. " + "Allowed values are 'sast' and 'sca'.", + ) ) - ) - - if not (100 <= page_size <= 3000): - raise McpError( - ErrorData(code=INVALID_PARAMS, message="page_size must be between 100 and 3000.") - ) - deployment = await get_deployment_slug() - api_token = get_semgrep_app_token() - if not api_token: - raise McpError( - ErrorData( - code=INVALID_PARAMS, - message="SEMGREP_APP_TOKEN environment variable must be set to use this tool. " - "Create a token at semgrep.dev to continue.", + if not (100 <= page_size <= 3000): + raise McpError( + ErrorData(code=INVALID_PARAMS, message="page_size must be between 100 and 3000.") ) - ) - url = f"https://semgrep.dev/api/v1/deployments/{deployment}/findings" - headers = {"Authorization": f"Bearer {api_token}", "Accept": "application/json"} - - params_to_filter: dict[str, Any] = { - "issue_type": issue_type, - "status": status, - "repos": ",".join(repos) if repos else None, - "severities": severities, - "confidence": confidence, - "autotriage_verdict": autotriage_verdict, - "page": page, - "page_size": page_size, - } - params = {k: v for k, v in params_to_filter.items() if v is not None} - - try: - response = await http_client.get(url, headers=headers, params=params) - response.raise_for_status() - data = response.json() - return [Finding.model_validate(finding) for finding in data.get("findings", [])] - except httpx.HTTPStatusError as e: - if e.response.status_code == 401: + deployment = await get_deployment_slug() + api_token = get_semgrep_app_token() + if not api_token: raise McpError( ErrorData( code=INVALID_PARAMS, - message="Invalid API token: check your SEMGREP_APP_TOKEN environment variable.", + message="SEMGREP_APP_TOKEN environment variable must be set to use this tool. " + "Create a token at semgrep.dev to continue.", ) - ) from e - elif e.response.status_code == 404: + ) + + url = f"https://semgrep.dev/api/v1/deployments/{deployment}/findings" + headers = {"Authorization": f"Bearer {api_token}", "Accept": "application/json"} + + params_to_filter: dict[str, Any] = { + "issue_type": issue_type, + "status": status, + "repos": ",".join(repos) if repos else None, + "severities": severities, + "confidence": confidence, + "autotriage_verdict": autotriage_verdict, + "page": page, + "page_size": page_size, + } + params = {k: v for k, v in params_to_filter.items() if v is not None} + + try: + response = await http_client.get(url, headers=headers, params=params) + response.raise_for_status() + data = response.json() + return [Finding.model_validate(finding) for finding in data.get("findings", [])] + except httpx.HTTPStatusError as e: + if e.response.status_code == 401: + raise McpError( + ErrorData( + code=INVALID_PARAMS, + message="Invalid API token: check your SEMGREP_APP_TOKEN environment variable.", + ) + ) from e + elif e.response.status_code == 404: + raise McpError( + ErrorData( + code=INVALID_PARAMS, + message=f"Deployment '{deployment}' not found or you don't have access to it.", + ) + ) from e + else: + raise McpError( + ErrorData( + code=INTERNAL_ERROR, + message=f"Error fetching findings: {e.response.text}", + ) + ) from e + except ValidationError as e: raise McpError( - ErrorData( - code=INVALID_PARAMS, - message=f"Deployment '{deployment}' not found or you don't have access to it.", - ) + ErrorData(code=INTERNAL_ERROR, message=f"Error parsing semgrep output: {e!s}") ) from e - else: + except Exception as e: raise McpError( - ErrorData( - code=INTERNAL_ERROR, - message=f"Error fetching findings: {e.response.text}", - ) + ErrorData(code=INTERNAL_ERROR, message=f"Error fetching findings from Semgrep: {e!s}") ) from e - except ValidationError as e: - raise McpError( - ErrorData(code=INTERNAL_ERROR, message=f"Error parsing semgrep output: {e!s}") - ) from e - except Exception as e: - raise McpError( - ErrorData(code=INTERNAL_ERROR, message=f"Error fetching findings from Semgrep: {e!s}") - ) from e - - -@mcp.tool() -@with_tool_span() -async def semgrep_scan_with_custom_rule( - ctx: Context, - code_files: list[dict[str, str]] = CODE_FILES_FIELD, - rule: str = RULE_FIELD, -) -> SemgrepScanResult: - """ - Runs a Semgrep scan with a custom rule on provided code content - and returns the findings in JSON format - - Use this tool when you need to: - - scan code files for specific security vulnerability not covered by the default Semgrep rules - - scan code files for specific issue not covered by the default Semgrep rules - """ - # Validate code_files - validated_code_files = validate_code_files(code_files) - temp_dir = None - try: - # Create temporary files from code content - temp_dir = create_temp_files_from_code_content(validated_code_files) - # Write rule to file - rule_file_path = os.path.join(temp_dir, "rule.yaml") - with open(rule_file_path, "w") as f: - f.write(rule) - - # Run semgrep scan with custom rule - args = get_semgrep_scan_args(temp_dir, rule_file_path) - output = await run_semgrep_output(top_level_span=None, args=args) - results: SemgrepScanResult = SemgrepScanResult.model_validate_json(output) - remove_temp_dir_from_results(results, temp_dir) - return results - - except McpError as e: - raise e - except ValidationError as e: - raise McpError( - ErrorData(code=INTERNAL_ERROR, message=f"Error parsing semgrep output: {e!s}") - ) from e - except Exception as e: - raise McpError( - ErrorData(code=INTERNAL_ERROR, message=f"Error running semgrep scan: {e!s}") - ) from e - - finally: - if temp_dir: - # Clean up temporary files - shutil.rmtree(temp_dir, ignore_errors=True) - - -@mcp.tool() -@with_tool_span() -async def semgrep_scan( - ctx: Context, - code_files: list[dict[str, str]] = CODE_FILES_FIELD, - config: str | None = CONFIG_FIELD, -) -> SemgrepScanResult | CliOutput: - """ - Runs a Semgrep scan on provided code content and returns the findings in JSON format - - Depending on whether `USE_SEMGREP_RPC` is set, this tool will either run a `pysemgrep` - CLI scan, or an RPC-based scan. - - Respectively, this will cause us to return either a `SemgrepScanResult` or a `CliOutput`. - - Use this tool when you need to: - - scan code files for security vulnerabilities - - scan code files for other issues - """ - # Validate config - config = validate_config(config) - - # Validate code_files - validated_code_files = validate_code_files(code_files) - - temp_dir = None - try: - # Create temporary files from code content - temp_dir = create_temp_files_from_code_content(validated_code_files) - args = get_semgrep_scan_args(temp_dir, config) - output = await run_semgrep_output(top_level_span=None, args=args) - results: SemgrepScanResult = SemgrepScanResult.model_validate_json(output) - remove_temp_dir_from_results(results, temp_dir) - return results - - except McpError as e: - raise e - except ValidationError as e: - raise McpError( - ErrorData(code=INTERNAL_ERROR, message=f"Error parsing semgrep output: {e!s}") - ) from e - except Exception as e: - raise McpError( - ErrorData(code=INTERNAL_ERROR, message=f"Error running semgrep scan: {e!s}") - ) from e - - finally: - if temp_dir: - # Clean up temporary files - shutil.rmtree(temp_dir, ignore_errors=True) - -@mcp.tool() -@with_tool_span() -async def semgrep_scan_rpc( - ctx: Context, - code_files: list[dict[str, str]] = CODE_FILES_FIELD, -) -> CliOutput: - """ - Runs a Semgrep scan on provided code content using the new Semgrep RPC feature. - - This should run much faster than the comparative `semgrep_scan` tool. - - Use this tool when you need to: - - scan code files for security vulnerabilities - - scan code files for other issues - - scan quickly - """ - # Validate code_files - # TODO: could this be slow if content is big? - validated_code_files = validate_code_files(code_files) - - temp_dir = None - try: - # TODO: perhaps should return more interpretable results? - context: SemgrepContext = ctx.request_context.lifespan_context - cli_output = await run_semgrep_via_rpc(context, validated_code_files) - return cli_output - except McpError as e: - raise e - except ValidationError as e: - raise McpError( - ErrorData(code=INTERNAL_ERROR, message=f"Error parsing semgrep output: {e!s}") - ) from e - except Exception as e: - raise McpError( - ErrorData(code=INTERNAL_ERROR, message=f"Error running semgrep scan: {e!s}") - ) from e - - finally: - if temp_dir: - # Clean up temporary files - shutil.rmtree(temp_dir, ignore_errors=True) +# Conditionally register if SEMGREP_SCAN_WITH_CUSTOM_RULE_DISABLED is not true +if os.environ.get("SEMGREP_SCAN_WITH_CUSTOM_RULE_DISABLED", "").lower() != "true": + @mcp.tool() + @with_tool_span() + async def semgrep_scan_with_custom_rule( + ctx: Context, + code_files: list[dict[str, str]] = CODE_FILES_FIELD, + rule: str = RULE_FIELD, + ) -> SemgrepScanResult: + """ + Runs a Semgrep scan with a custom rule on provided code content + and returns the findings in JSON format + + Use this tool when you need to: + - scan code files for specific security vulnerability not covered by the default Semgrep rules + - scan code files for specific issue not covered by the default Semgrep rules + """ + # Validate code_files + validated_code_files = validate_code_files(code_files) + temp_dir = None + try: + # Create temporary files from code content + temp_dir = create_temp_files_from_code_content(validated_code_files) + # Write rule to file + rule_file_path = os.path.join(temp_dir, "rule.yaml") + with open(rule_file_path, "w") as f: + f.write(rule) + + # Run semgrep scan with custom rule + args = get_semgrep_scan_args(temp_dir, rule_file_path) + output = await run_semgrep_output(top_level_span=None, args=args) + results: SemgrepScanResult = SemgrepScanResult.model_validate_json(output) + remove_temp_dir_from_results(results, temp_dir) + return results -@mcp.tool() -@with_tool_span() -async def semgrep_scan_local( - ctx: Context, - code_files: list[dict[str, str]] = LOCAL_CODE_FILES_FIELD, - config: str | None = CONFIG_FIELD, -) -> list[SemgrepScanResult]: - """ - Runs a Semgrep scan locally on provided code files returns the findings in JSON format. - - Files are expected to be in the current paths are absolute paths to the code files. - - Use this tool when you need to: - - scan code files for security vulnerabilities - - scan code files for other issues - """ - import os - - if not os.environ.get("SEMGREP_ALLOW_LOCAL_SCAN"): - raise McpError( - ErrorData( - code=INVALID_PARAMS, - message=( - "Local Semgrep scans are not allowed unless SEMGREP_ALLOW_LOCAL_SCAN is set" - ), - ) - ) - # Validate config - config = validate_config(config) - - validated_local_files = validate_local_files(code_files) + except McpError as e: + raise e + except ValidationError as e: + raise McpError( + ErrorData(code=INTERNAL_ERROR, message=f"Error parsing semgrep output: {e!s}") + ) from e + except Exception as e: + raise McpError( + ErrorData(code=INTERNAL_ERROR, message=f"Error running semgrep scan: {e!s}") + ) from e - temp_dir = None - try: - results = [] - for cf in validated_local_files: - args = get_semgrep_scan_args(cf.path, config) + finally: + if temp_dir: + # Clean up temporary files + shutil.rmtree(temp_dir, ignore_errors=True) + + +# Conditionally register if SEMGREP_SCAN_DISABLED is not true +if os.environ.get("SEMGREP_SCAN_DISABLED", "").lower() != "true": + @mcp.tool() + @with_tool_span() + async def semgrep_scan( + ctx: Context, + code_files: list[dict[str, str]] = CODE_FILES_FIELD, + config: str | None = CONFIG_FIELD, + ) -> SemgrepScanResult | CliOutput: + """ + Runs a Semgrep scan on provided code content and returns the findings in JSON format + + Depending on whether `USE_SEMGREP_RPC` is set, this tool will either run a `pysemgrep` + CLI scan, or an RPC-based scan. + + Respectively, this will cause us to return either a `SemgrepScanResult` or a `CliOutput`. + + Use this tool when you need to: + - scan code files for security vulnerabilities + - scan code files for other issues + """ + # Validate config + config = validate_config(config) + + # Validate code_files + validated_code_files = validate_code_files(code_files) + + temp_dir = None + try: + # Create temporary files from code content + temp_dir = create_temp_files_from_code_content(validated_code_files) + args = get_semgrep_scan_args(temp_dir, config) output = await run_semgrep_output(top_level_span=None, args=args) - result: SemgrepScanResult = SemgrepScanResult.model_validate_json(output) - results.append(result) - return results + results: SemgrepScanResult = SemgrepScanResult.model_validate_json(output) + remove_temp_dir_from_results(results, temp_dir) + return results - except McpError as e: - raise e - except ValidationError as e: - raise McpError( - ErrorData(code=INTERNAL_ERROR, message=f"Error parsing semgrep output: {e!s}") - ) from e - except Exception as e: - raise McpError( - ErrorData(code=INTERNAL_ERROR, message=f"Error running semgrep scan: {e!s}") - ) from e - - finally: - if temp_dir: - # Clean up temporary files - shutil.rmtree(temp_dir, ignore_errors=True) + except McpError as e: + raise e + except ValidationError as e: + raise McpError( + ErrorData(code=INTERNAL_ERROR, message=f"Error parsing semgrep output: {e!s}") + ) from e + except Exception as e: + raise McpError( + ErrorData(code=INTERNAL_ERROR, message=f"Error running semgrep scan: {e!s}") + ) from e + finally: + if temp_dir: + # Clean up temporary files + shutil.rmtree(temp_dir, ignore_errors=True) + + +# Conditionally register if SEMGREP_SCAN_RPC_DISABLED is not true +if os.environ.get("SEMGREP_SCAN_RPC_DISABLED", "").lower() != "true": + @mcp.tool() + @with_tool_span() + async def semgrep_scan_rpc( + ctx: Context, + code_files: list[dict[str, str]] = CODE_FILES_FIELD, + ) -> CliOutput: + """ + Runs a Semgrep scan on provided code content using the new Semgrep RPC feature. + + This should run much faster than the comparative `semgrep_scan` tool. + + Use this tool when you need to: + - scan code files for security vulnerabilities + - scan code files for other issues + - scan quickly + """ + # Validate code_files + # TODO: could this be slow if content is big? + validated_code_files = validate_code_files(code_files) + + temp_dir = None + try: + # TODO: perhaps should return more interpretable results? + context: SemgrepContext = ctx.request_context.lifespan_context + cli_output = await run_semgrep_via_rpc(context, validated_code_files) + return cli_output + except McpError as e: + raise e + except ValidationError as e: + raise McpError( + ErrorData(code=INTERNAL_ERROR, message=f"Error parsing semgrep output: {e!s}") + ) from e + except Exception as e: + raise McpError( + ErrorData(code=INTERNAL_ERROR, message=f"Error running semgrep scan: {e!s}") + ) from e -@mcp.tool() -@with_tool_span() -async def security_check( - ctx: Context, - code_files: list[dict[str, str]] = CODE_FILES_FIELD, -) -> str: - """ - Runs a fast security check on code and returns any issues found. + finally: + if temp_dir: + # Clean up temporary files + shutil.rmtree(temp_dir, ignore_errors=True) + + +# Conditionally register if SEMGREP_SCAN_LOCAL_DISABLED is not true +if os.environ.get("SEMGREP_SCAN_LOCAL_DISABLED", "").lower() != "true": + @mcp.tool() + @with_tool_span() + async def semgrep_scan_local( + ctx: Context, + code_files: list[dict[str, str]] = LOCAL_CODE_FILES_FIELD, + config: str | None = CONFIG_FIELD, + ) -> list[SemgrepScanResult]: + """ + Runs a Semgrep scan locally on provided code files returns the findings in JSON format. + + Files are expected to be in the current paths are absolute paths to the code files. + + Use this tool when you need to: + - scan code files for security vulnerabilities + - scan code files for other issues + """ + import os + + if not os.environ.get("SEMGREP_ALLOW_LOCAL_SCAN"): + raise McpError( + ErrorData( + code=INVALID_PARAMS, + message=( + "Local Semgrep scans are not allowed unless SEMGREP_ALLOW_LOCAL_SCAN is set" + ), + ) + ) + # Validate config + config = validate_config(config) - Use this tool when you need to: - - scan code for security vulnerabilities - - verify that code is secure - - double check that code is secure before committing - - get a second opinion on code security + validated_local_files = validate_local_files(code_files) - If there are any issues found, you **MUST** fix them or offer to fix them and - explain to the user why it's important to fix. - If there are no issues, you can be reasonably confident that the code is secure. - """ - # Validate code_files - validated_code_files = validate_code_files(code_files) + temp_dir = None + try: + results = [] + for cf in validated_local_files: + args = get_semgrep_scan_args(cf.path, config) + output = await run_semgrep_output(top_level_span=None, args=args) + result: SemgrepScanResult = SemgrepScanResult.model_validate_json(output) + results.append(result) + return results + + except McpError as e: + raise e + except ValidationError as e: + raise McpError( + ErrorData(code=INTERNAL_ERROR, message=f"Error parsing semgrep output: {e!s}") + ) from e + except Exception as e: + raise McpError( + ErrorData(code=INTERNAL_ERROR, message=f"Error running semgrep scan: {e!s}") + ) from e - no_findings_message = """No security issues found in the code!""" - security_issues_found_message_template = """{num_issues} security issues found in the code. + finally: + if temp_dir: + # Clean up temporary files + shutil.rmtree(temp_dir, ignore_errors=True) + + +# Conditionally register if SECURITY_CHECK_DISABLED is not true +if os.environ.get("SECURITY_CHECK_DISABLED", "").lower() != "true": + @mcp.tool() + @with_tool_span() + async def security_check( + ctx: Context, + code_files: list[dict[str, str]] = CODE_FILES_FIELD, + ) -> str: + """ + Runs a fast security check on code and returns any issues found. + + Use this tool when you need to: + - scan code for security vulnerabilities + - verify that code is secure + - double check that code is secure before committing + - get a second opinion on code security + + If there are any issues found, you **MUST** fix them or offer to fix them and + explain to the user why it's important to fix. + If there are no issues, you can be reasonably confident that the code is secure. + """ + # Validate code_files + validated_code_files = validate_code_files(code_files) + + no_findings_message = """No security issues found in the code!""" + security_issues_found_message_template = """{num_issues} security issues found in the code. Here are the details of the security issues found: @@ -838,98 +854,100 @@ async def security_check( {details} """ - temp_dir = None - try: - # Create temporary files from code content - temp_dir = create_temp_files_from_code_content(validated_code_files) - args = get_semgrep_scan_args(temp_dir) - output = await run_semgrep_output(top_level_span=None, args=args) - results: SemgrepScanResult = SemgrepScanResult.model_validate_json(output) - remove_temp_dir_from_results(results, temp_dir) - - if len(results.results) > 0: - return security_issues_found_message_template.format( - num_issues=len(results.results), - details=results.model_dump_json(indent=2), - ) - else: - return no_findings_message - - except McpError as e: - raise e - except ValidationError as e: - raise McpError( - ErrorData(code=INTERNAL_ERROR, message=f"Error parsing semgrep output: {e!s}") - ) from e - except Exception as e: - raise McpError( - ErrorData(code=INTERNAL_ERROR, message=f"Error running semgrep scan: {e!s}") - ) from e - - finally: - if temp_dir: - # Clean up temporary files - shutil.rmtree(temp_dir, ignore_errors=True) - + temp_dir = None + try: + # Create temporary files from code content + temp_dir = create_temp_files_from_code_content(validated_code_files) + args = get_semgrep_scan_args(temp_dir) + output = await run_semgrep_output(top_level_span=None, args=args) + results: SemgrepScanResult = SemgrepScanResult.model_validate_json(output) + remove_temp_dir_from_results(results, temp_dir) -@mcp.tool() -@with_tool_span() -async def get_abstract_syntax_tree( - ctx: Context, - code: str = Field(description="The code to get the AST for"), - language: str = Field(description="The programming language of the code"), -) -> str: - """ - Returns the Abstract Syntax Tree (AST) for the provided code file in JSON format + if len(results.results) > 0: + return security_issues_found_message_template.format( + num_issues=len(results.results), + details=results.model_dump_json(indent=2), + ) + else: + return no_findings_message - Use this tool when you need to: - - get the Abstract Syntax Tree (AST) for the provided code file\ - - get the AST of a file - - understand the structure of the code in a more granular way - - see what a parser sees in the code - """ - temp_dir = None - temp_file_path = "" - try: - # Create temporary directory and file for AST generation - temp_dir = tempfile.mkdtemp(prefix="semgrep_ast_") - temp_file_path = os.path.join(temp_dir, "code.txt") # safe - - # Write content to file - with open(temp_file_path, "w") as f: - f.write(code) - - args = [ - "--experimental", - "--dump-ast", - "-l", - language, - "--json", - temp_file_path, - ] - return await run_semgrep_output(top_level_span=None, args=args) + except McpError as e: + raise e + except ValidationError as e: + raise McpError( + ErrorData(code=INTERNAL_ERROR, message=f"Error parsing semgrep output: {e!s}") + ) from e + except Exception as e: + raise McpError( + ErrorData(code=INTERNAL_ERROR, message=f"Error running semgrep scan: {e!s}") + ) from e - except McpError as e: - raise e - except ValidationError as e: - raise McpError( - ErrorData(code=INTERNAL_ERROR, message=f"Error parsing semgrep output: {e!s}") - ) from e - except OSError as e: - raise McpError( - ErrorData( - code=INTERNAL_ERROR, - message=f"Failed to create or write to file {temp_file_path}: {e!s}", - ) - ) from e - except Exception as e: - raise McpError( - ErrorData(code=INTERNAL_ERROR, message=f"Error running semgrep scan: {e!s}") - ) from e - finally: - if temp_dir: - # Clean up temporary files - shutil.rmtree(temp_dir, ignore_errors=True) + finally: + if temp_dir: + # Clean up temporary files + shutil.rmtree(temp_dir, ignore_errors=True) + + +# Conditionally register if GET_ABSTRACT_SYNTAX_TREE_DISABLED is not true +if os.environ.get("GET_ABSTRACT_SYNTAX_TREE_DISABLED", "").lower() != "true": + @mcp.tool() + @with_tool_span() + async def get_abstract_syntax_tree( + ctx: Context, + code: str = Field(description="The code to get the AST for"), + language: str = Field(description="The programming language of the code"), + ) -> str: + """ + Returns the Abstract Syntax Tree (AST) for the provided code file in JSON format + + Use this tool when you need to: + - get the Abstract Syntax Tree (AST) for the provided code file\ + - get the AST of a file + - understand the structure of the code in a more granular way + - see what a parser sees in the code + """ + temp_dir = None + temp_file_path = "" + try: + # Create temporary directory and file for AST generation + temp_dir = tempfile.mkdtemp(prefix="semgrep_ast_") + temp_file_path = os.path.join(temp_dir, "code.txt") # safe + + # Write content to file + with open(temp_file_path, "w") as f: + f.write(code) + + args = [ + "--experimental", + "--dump-ast", + "-l", + language, + "--json", + temp_file_path, + ] + return await run_semgrep_output(top_level_span=None, args=args) + + except McpError as e: + raise e + except ValidationError as e: + raise McpError( + ErrorData(code=INTERNAL_ERROR, message=f"Error parsing semgrep output: {e!s}") + ) from e + except OSError as e: + raise McpError( + ErrorData( + code=INTERNAL_ERROR, + message=f"Failed to create or write to file {temp_file_path}: {e!s}", + ) + ) from e + except Exception as e: + raise McpError( + ErrorData(code=INTERNAL_ERROR, message=f"Error running semgrep scan: {e!s}") + ) from e + finally: + if temp_dir: + # Clean up temporary files + shutil.rmtree(temp_dir, ignore_errors=True) # --------------------------------------------------------------------------------- @@ -937,20 +955,22 @@ async def get_abstract_syntax_tree( # --------------------------------------------------------------------------------- -@mcp.prompt() -def write_custom_semgrep_rule( - code: str = Field(description="The code to get the AST for"), - language: str = Field(description="The programming language of the code"), -) -> str: - """ - Write a custom Semgrep rule for the provided code and language +# Conditionally register if WRITE_CUSTOM_SEMGREP_RULE_DISABLED is not true +if os.environ.get("WRITE_CUSTOM_SEMGREP_RULE_DISABLED", "").lower() != "true": + @mcp.prompt() + def write_custom_semgrep_rule( + code: str = Field(description="The code to get the AST for"), + language: str = Field(description="The programming language of the code"), + ) -> str: + """ + Write a custom Semgrep rule for the provided code and language - Use this prompt when you need to: - - write a custom Semgrep rule - - write a Semgrep rule for a specific issue or pattern - """ + Use this prompt when you need to: + - write a custom Semgrep rule + - write a Semgrep rule for a specific issue or pattern + """ - prompt_template = """You are an expert at writing Semgrep rules. + prompt_template = """You are an expert at writing Semgrep rules. Your task is to analyze a given piece of code and create a Semgrep rule that can detect specific patterns or issues within that code. @@ -1008,7 +1028,7 @@ def write_custom_semgrep_rule( Ensure that the rule is properly formatted in YAML. Make sure to include all the required keys and values in the rule.""" - return prompt_template.format(code=code, language=language) + return prompt_template.format(code=code, language=language) # --------------------------------------------------------------------------------- From 6c082661978963b1715f93247f015e41b1125225 Mon Sep 17 00:00:00 2001 From: jkosik Date: Sat, 30 Aug 2025 17:45:08 +0200 Subject: [PATCH 2/2] fixed vars --- chart/semgrep-mcp/values.yaml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/chart/semgrep-mcp/values.yaml b/chart/semgrep-mcp/values.yaml index 80496d8..31c3825 100644 --- a/chart/semgrep-mcp/values.yaml +++ b/chart/semgrep-mcp/values.yaml @@ -29,11 +29,11 @@ env: [] value: "your-semgrep-url-here" - name: SEMGREP_RULE_SCHEMA_DISABLED value: "true" - - name: SEMGREP_SUPPORTED_LANGUAGES_DISABLED + - name: GET_SUPPORTED_LANGUAGES_DISABLED value: "true" - name: SEMGREP_FINDINGS_DISABLED value: "true" - - name: SEMGREP_SCAN_CUSTOM_RULE_DISABLED + - name: SEMGREP_SCAN_WITH_CUSTOM_RULE_DISABLED value: "true" - name: SEMGREP_SCAN_DISABLED value: "true" @@ -43,7 +43,7 @@ env: [] value: "true" - name: SECURITY_CHECK_DISABLED value: "true" - - name: SEMGREP_AST_DISABLED + - name: GET_ABSTRACT_SYNTAX_TREE_DISABLED value: "true" - - name: SEMGREP_RULE_PROMPT_DISABLED - value: "true" \ No newline at end of file + - name: WRITE_CUSTOM_SEMGREP_RULE_DISABLED + value: "true" \ No newline at end of file