diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index b98b4e5..1341d3a 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -24,6 +24,7 @@ jobs: - uses: actions/checkout@v4 - name: Install the latest version of uv uses: astral-sh/setup-uv@v5 + - run: uv python pin cp311 - name: Install dependencies run: uv sync - name: Format diff --git a/pyproject.toml b/pyproject.toml index 283c5e7..763cebf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,7 @@ license = {text = "MIT"} requires-python = ">=3.11" dependencies = [ "mcp>=1.8.0", - "mysql-connector-python>=9.1.0", + "mysql-connector-python==9.5.0", "pyyaml>=6.0.2", "aiohttp>=3.9.0", ] diff --git a/src/greptimedb_mcp_server/server.py b/src/greptimedb_mcp_server/server.py index 3fe50a4..7d21e00 100644 --- a/src/greptimedb_mcp_server/server.py +++ b/src/greptimedb_mcp_server/server.py @@ -374,7 +374,7 @@ async def execute_tql( ], end: Annotated[ str, - "End time: SQL expression (e.g., 'now()'), " "RFC3339, or Unix timestamp", + "End time: SQL expression (e.g., 'now()'), RFC3339, or Unix timestamp", ], step: Annotated[str, "Query resolution step, e.g., '1m', '5m', '1h'"], lookback: Annotated[str | None, "Lookback delta for range queries"] = None, @@ -722,16 +722,14 @@ async def create_pipeline( pipelines = result.get("pipelines", []) version = pipelines[0]["version"] if pipelines else "unknown" return ( - f"Pipeline '{name}' created successfully.\n" - f"Version: {version}" + f"Pipeline '{name}' created successfully.\nVersion: {version}" ) except (json.JSONDecodeError, KeyError, IndexError): return f"Pipeline '{name}' created successfully." else: error_detail = response_text if response_text else "No details" return ( - f"Error creating pipeline (HTTP {response.status}): " - f"{error_detail}" + f"Error creating pipeline (HTTP {response.status}): {error_detail}" ) except aiohttp.ClientError as e: @@ -741,24 +739,59 @@ async def create_pipeline( @mcp.tool() async def dryrun_pipeline( - pipeline_name: Annotated[str, "Name of the pipeline to test"], - data: Annotated[str, "Test data in JSON format (single object or array)"], + pipeline: Annotated[ + str | None, + "Pipeline configuration in YAML format (inline). Provide this to test a pipeline without saving it.", + ] = None, + pipeline_name: Annotated[ + str | None, + "Name of the saved pipeline to test. Provide either 'pipeline' or 'pipeline_name', not both.", + ] = None, + data: Annotated[ + str, "Test data in JSON or NDJSON format (single object or array)" + ] = "", + data_type: Annotated[ + str | None, + "Content type of the data (e.g., 'application/x-ndjson'). If omitted, GreptimeDB will use default.", + ] = None, ) -> str: - """Test a pipeline with sample data without writing to the database.""" + """Test a pipeline with sample data without writing to the database. + + You can test a pipeline in two ways: + - Provide 'pipeline' with inline YAML configuration + - Provide 'pipeline_name' to test a previously saved pipeline + + Args: + pipeline: Pipeline YAML configuration (inline) + pipeline_name: Name of saved pipeline (mutually exclusive with pipeline) + data: Test data in JSON/NDJSON format + data_type: Optional content type (e.g., 'application/x-ndjson') + """ state = get_state() - pipeline_name = _validate_pipeline_name(pipeline_name) - try: - parsed = json.loads(data) - normalized_data = json.dumps(parsed, ensure_ascii=False) - except json.JSONDecodeError as e: - return f"Error: Invalid JSON data: {str(e)}" + if not data or not data.strip(): + return "Error: data parameter is required" + + if pipeline is not None and pipeline_name is not None: + return "Error: Provide either 'pipeline' or 'pipeline_name', not both" + + if pipeline is None and pipeline_name is None: + return "Error: Provide either 'pipeline' or 'pipeline_name'" + + if pipeline_name is not None: + pipeline_name = _validate_pipeline_name(pipeline_name) url = f"{state.http_base_url}/v1/pipelines/_dryrun" - request_body = { - "pipeline_name": pipeline_name, - "data": normalized_data, - } + request_body = {"data": data} + + if data_type: + request_body["data_type"] = data_type + + if pipeline is not None: + request_body["pipeline"] = pipeline + elif pipeline_name is not None: + request_body["pipeline_name"] = pipeline_name + auth = state.get_http_auth() logger.debug(f"Dryrun request URL: {url}") logger.debug(f"Dryrun request body: {request_body}") @@ -780,12 +813,11 @@ async def dryrun_pipeline( else: error_detail = response_text if response_text else "No details" return ( - f"Error testing pipeline (HTTP {response.status}): " - f"{error_detail}" + f"Error testing pipeline (HTTP {response.status}): {error_detail}" ) except aiohttp.ClientError as e: - logger.error(f"HTTP error testing pipeline '{pipeline_name}': {e}") + logger.error(f"HTTP error testing pipeline: {e}") return f"Error testing pipeline: {str(e)}" @@ -813,8 +845,7 @@ async def delete_pipeline( else: error_detail = response_text if response_text else "No details" return ( - f"Error deleting pipeline (HTTP {response.status}): " - f"{error_detail}" + f"Error deleting pipeline (HTTP {response.status}): {error_detail}" ) except aiohttp.ClientError as e: diff --git a/src/greptimedb_mcp_server/templates/pipeline_creator/template.md b/src/greptimedb_mcp_server/templates/pipeline_creator/template.md index b3d5723..07cfcee 100644 --- a/src/greptimedb_mcp_server/templates/pipeline_creator/template.md +++ b/src/greptimedb_mcp_server/templates/pipeline_creator/template.md @@ -132,31 +132,42 @@ transform: Generate a complete, valid YAML pipeline configuration. After generation: 1. Use `create_pipeline` tool to create the pipeline -2. Use `dryrun_pipeline` tool to verify with sample data +2. Use `dryrun_pipeline` tool with inline pipeline YAML to verify with sample data **Note**: You can update an existing pipeline by calling `create_pipeline` with the same name. Each call creates a new version. Use `list_pipelines` to view all versions, and `delete_pipeline` to remove specific versions. ## Testing with dryrun_pipeline -The `dryrun_pipeline` tool accepts JSON data in the following formats: +Use `dryrun_pipeline` with separated parameters: -**Single log entry (JSON object with "message" field for plain text logs):** -```json -{"message": "127.0.0.1 - - [25/May/2024:20:16:37 +0000] \"GET /index.html HTTP/1.1\" 200 612"} +**Example with inline pipeline YAML:** +```python +dryrun_pipeline( + pipeline='''version: 2 +processors: + - date: + fields: + - timestamp + formats: + - '%Y-%m-%dT%H:%M:%SZ' ''', + data='{"timestamp": "2024-05-25T20:16:37Z", "level": "INFO"}', + data_type='application/json' +) ``` -**Multiple log entries (JSON array):** -```json -[ - {"message": "127.0.0.1 - - [25/May/2024:20:16:37 +0000] \"GET /index.html HTTP/1.1\" 200 612"}, - {"message": "192.168.1.1 - - [25/May/2024:20:17:37 +0000] \"POST /api/login HTTP/1.1\" 200 1784"} -] +**Example with saved pipeline:** +```python +dryrun_pipeline( + pipeline_name='my_log_pipeline', + data='{"message": "127.0.0.1 - - [25/May/2024:20:16:37 +0000]"}', + data_type='application/x-ndjson' +) ``` -**Structured JSON logs (fields map directly to pipeline input):** -```json -{"timestamp": "2024-05-25 20:16:37", "level": "INFO", "service": "api", "message": "Request processed"} -``` +**Data Formats:** +- **Single log entry:** `{"message": "127.0.0.1 - - [25/May/2024:20:16:37 +0000]"}` +- **Multiple entries (JSON array):** `[{"message": "log1"}, {"message": "log2"}]` +- **NDJSON (newline-delimited):** Use `data_type='application/x-ndjson'` with data like `"{"msg":"line1"}\n{"msg":"line2"}"` ## Common Log Format Examples @@ -213,6 +224,8 @@ Pattern: `%{timestamp} %{hostname} %{app}[%{pid}]: %{message}` ## Troubleshooting If `dryrun_pipeline` fails: +- **Missing required parameters**: Ensure you provide `data` and exactly one of `pipeline` or `pipeline_name` +- **Both pipeline and pipeline_name provided**: Only provide one of them - **Pattern mismatch**: Check if dissect/regex pattern matches the log format exactly - **Date format error**: Verify the date format string matches the timestamp in logs - **Missing fields**: Use `ignore_missing: true` in processors to handle optional fields @@ -230,11 +243,11 @@ curl -X POST "http://localhost:4000/v1/pipelines/my_pipeline" \ -H "Content-Type: application/x-yaml" \ -d @pipeline.yaml -# Dryrun pipeline +# Dryrun pipeline (constructs JSON request internally) curl -X POST "http://localhost:4000/v1/pipelines/_dryrun" \ -u ":" \ -H "Content-Type: application/json" \ - -d '{"pipeline_name": "my_pipeline", "data": "{\"message\": \"test log entry\"}"}' + -d '{"pipeline": "version: 2", "data": "{\"timestamp\": \"2024-05-25T20:16:37Z\"}", "data_type": "application/json"}' # Delete pipeline curl -X DELETE "http://localhost:4000/v1/pipelines/my_pipeline?version=" \ diff --git a/tests/test_server.py b/tests/test_server.py index 5a6b71f..386d3a3 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -592,18 +592,34 @@ async def test_create_pipeline_invalid_name(): @pytest.mark.asyncio -async def test_dryrun_pipeline_invalid_name(): - """Test dryrun_pipeline with invalid name""" +async def test_dryrun_pipeline_invalid_pipeline_name(): + """Test dryrun_pipeline with invalid pipeline name""" with pytest.raises(ValueError) as excinfo: await dryrun_pipeline(pipeline_name="123-invalid", data='{"message": "test"}') assert "Invalid pipeline name" in str(excinfo.value) @pytest.mark.asyncio -async def test_dryrun_pipeline_invalid_json(): - """Test dryrun_pipeline with invalid JSON data""" - result = await dryrun_pipeline(pipeline_name="test_pipeline", data="invalid json") - assert "Error: Invalid JSON data" in result +async def test_dryrun_pipeline_missing_data(): + """Test dryrun_pipeline without required data parameter""" + result = await dryrun_pipeline(pipeline="version: 2", data="") + assert "Error: data parameter is required" in result + + +@pytest.mark.asyncio +async def test_dryrun_pipeline_both_pipeline_and_name(): + """Test dryrun_pipeline with both pipeline and pipeline_name (should error)""" + result = await dryrun_pipeline( + pipeline="version: 2", pipeline_name="test_pipeline", data='{"message": "test"}' + ) + assert "Error: Provide either 'pipeline' or 'pipeline_name', not both" in result + + +@pytest.mark.asyncio +async def test_dryrun_pipeline_neither_pipeline_nor_name(): + """Test dryrun_pipeline without pipeline or pipeline_name (should error)""" + result = await dryrun_pipeline(data='{"message": "test"}') + assert "Error: Provide either 'pipeline' or 'pipeline_name'" in result @pytest.mark.asyncio