Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ jobs:
- uses: actions/checkout@v4
- name: Install the latest version of uv
uses: astral-sh/setup-uv@v5
- run: uv python pin cp311
- name: Install dependencies
run: uv sync
- name: Format
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ license = {text = "MIT"}
requires-python = ">=3.11"
dependencies = [
"mcp>=1.8.0",
"mysql-connector-python>=9.1.0",
"mysql-connector-python==9.5.0",
"pyyaml>=6.0.2",
"aiohttp>=3.9.0",
]
Expand Down
77 changes: 54 additions & 23 deletions src/greptimedb_mcp_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ async def execute_tql(
],
end: Annotated[
str,
"End time: SQL expression (e.g., 'now()'), " "RFC3339, or Unix timestamp",
"End time: SQL expression (e.g., 'now()'), RFC3339, or Unix timestamp",
],
step: Annotated[str, "Query resolution step, e.g., '1m', '5m', '1h'"],
lookback: Annotated[str | None, "Lookback delta for range queries"] = None,
Expand Down Expand Up @@ -722,16 +722,14 @@ async def create_pipeline(
pipelines = result.get("pipelines", [])
version = pipelines[0]["version"] if pipelines else "unknown"
return (
f"Pipeline '{name}' created successfully.\n"
f"Version: {version}"
f"Pipeline '{name}' created successfully.\nVersion: {version}"
)
except (json.JSONDecodeError, KeyError, IndexError):
return f"Pipeline '{name}' created successfully."
else:
error_detail = response_text if response_text else "No details"
return (
f"Error creating pipeline (HTTP {response.status}): "
f"{error_detail}"
f"Error creating pipeline (HTTP {response.status}): {error_detail}"
)

except aiohttp.ClientError as e:
Expand All @@ -741,24 +739,59 @@ async def create_pipeline(

@mcp.tool()
async def dryrun_pipeline(
pipeline_name: Annotated[str, "Name of the pipeline to test"],
data: Annotated[str, "Test data in JSON format (single object or array)"],
pipeline: Annotated[
str | None,
"Pipeline configuration in YAML format (inline). Provide this to test a pipeline without saving it.",
] = None,
pipeline_name: Annotated[
str | None,
"Name of the saved pipeline to test. Provide either 'pipeline' or 'pipeline_name', not both.",
] = None,
data: Annotated[
str, "Test data in JSON or NDJSON format (single object or array)"
] = "",
data_type: Annotated[
str | None,
"Content type of the data (e.g., 'application/x-ndjson'). If omitted, GreptimeDB will use default.",
] = None,
) -> str:
"""Test a pipeline with sample data without writing to the database."""
"""Test a pipeline with sample data without writing to the database.

You can test a pipeline in two ways:
- Provide 'pipeline' with inline YAML configuration
- Provide 'pipeline_name' to test a previously saved pipeline

Args:
pipeline: Pipeline YAML configuration (inline)
pipeline_name: Name of saved pipeline (mutually exclusive with pipeline)
data: Test data in JSON/NDJSON format
data_type: Optional content type (e.g., 'application/x-ndjson')
"""
state = get_state()
pipeline_name = _validate_pipeline_name(pipeline_name)

try:
parsed = json.loads(data)
normalized_data = json.dumps(parsed, ensure_ascii=False)
except json.JSONDecodeError as e:
return f"Error: Invalid JSON data: {str(e)}"
if not data or not data.strip():
return "Error: data parameter is required"

if pipeline is not None and pipeline_name is not None:
return "Error: Provide either 'pipeline' or 'pipeline_name', not both"

if pipeline is None and pipeline_name is None:
return "Error: Provide either 'pipeline' or 'pipeline_name'"

if pipeline_name is not None:
pipeline_name = _validate_pipeline_name(pipeline_name)

url = f"{state.http_base_url}/v1/pipelines/_dryrun"
request_body = {
"pipeline_name": pipeline_name,
"data": normalized_data,
}
request_body = {"data": data}

if data_type:
request_body["data_type"] = data_type

if pipeline is not None:
request_body["pipeline"] = pipeline
elif pipeline_name is not None:
request_body["pipeline_name"] = pipeline_name

auth = state.get_http_auth()
logger.debug(f"Dryrun request URL: {url}")
logger.debug(f"Dryrun request body: {request_body}")
Expand All @@ -780,12 +813,11 @@ async def dryrun_pipeline(
else:
error_detail = response_text if response_text else "No details"
return (
f"Error testing pipeline (HTTP {response.status}): "
f"{error_detail}"
f"Error testing pipeline (HTTP {response.status}): {error_detail}"
)

except aiohttp.ClientError as e:
logger.error(f"HTTP error testing pipeline '{pipeline_name}': {e}")
logger.error(f"HTTP error testing pipeline: {e}")
return f"Error testing pipeline: {str(e)}"


Expand Down Expand Up @@ -813,8 +845,7 @@ async def delete_pipeline(
else:
error_detail = response_text if response_text else "No details"
return (
f"Error deleting pipeline (HTTP {response.status}): "
f"{error_detail}"
f"Error deleting pipeline (HTTP {response.status}): {error_detail}"
)

except aiohttp.ClientError as e:
Expand Down
47 changes: 30 additions & 17 deletions src/greptimedb_mcp_server/templates/pipeline_creator/template.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,31 +132,42 @@ transform:

Generate a complete, valid YAML pipeline configuration. After generation:
1. Use `create_pipeline` tool to create the pipeline
2. Use `dryrun_pipeline` tool to verify with sample data
2. Use `dryrun_pipeline` tool with inline pipeline YAML to verify with sample data

**Note**: You can update an existing pipeline by calling `create_pipeline` with the same name. Each call creates a new version. Use `list_pipelines` to view all versions, and `delete_pipeline` to remove specific versions.

## Testing with dryrun_pipeline

The `dryrun_pipeline` tool accepts JSON data in the following formats:
Use `dryrun_pipeline` with separated parameters:

**Single log entry (JSON object with "message" field for plain text logs):**
```json
{"message": "127.0.0.1 - - [25/May/2024:20:16:37 +0000] \"GET /index.html HTTP/1.1\" 200 612"}
**Example with inline pipeline YAML:**
```python
dryrun_pipeline(
pipeline='''version: 2
processors:
- date:
fields:
- timestamp
formats:
- '%Y-%m-%dT%H:%M:%SZ' ''',
data='{"timestamp": "2024-05-25T20:16:37Z", "level": "INFO"}',
data_type='application/json'
)
```

**Multiple log entries (JSON array):**
```json
[
{"message": "127.0.0.1 - - [25/May/2024:20:16:37 +0000] \"GET /index.html HTTP/1.1\" 200 612"},
{"message": "192.168.1.1 - - [25/May/2024:20:17:37 +0000] \"POST /api/login HTTP/1.1\" 200 1784"}
]
**Example with saved pipeline:**
```python
dryrun_pipeline(
pipeline_name='my_log_pipeline',
data='{"message": "127.0.0.1 - - [25/May/2024:20:16:37 +0000]"}',
data_type='application/x-ndjson'
)
```

**Structured JSON logs (fields map directly to pipeline input):**
```json
{"timestamp": "2024-05-25 20:16:37", "level": "INFO", "service": "api", "message": "Request processed"}
```
**Data Formats:**
- **Single log entry:** `{"message": "127.0.0.1 - - [25/May/2024:20:16:37 +0000]"}`
- **Multiple entries (JSON array):** `[{"message": "log1"}, {"message": "log2"}]`
- **NDJSON (newline-delimited):** Use `data_type='application/x-ndjson'` with data like `"{"msg":"line1"}\n{"msg":"line2"}"`

## Common Log Format Examples

Expand Down Expand Up @@ -213,6 +224,8 @@ Pattern: `%{timestamp} %{hostname} %{app}[%{pid}]: %{message}`
## Troubleshooting

If `dryrun_pipeline` fails:
- **Missing required parameters**: Ensure you provide `data` and exactly one of `pipeline` or `pipeline_name`
- **Both pipeline and pipeline_name provided**: Only provide one of them
- **Pattern mismatch**: Check if dissect/regex pattern matches the log format exactly
- **Date format error**: Verify the date format string matches the timestamp in logs
- **Missing fields**: Use `ignore_missing: true` in processors to handle optional fields
Expand All @@ -230,11 +243,11 @@ curl -X POST "http://localhost:4000/v1/pipelines/my_pipeline" \
-H "Content-Type: application/x-yaml" \
-d @pipeline.yaml

# Dryrun pipeline
# Dryrun pipeline (constructs JSON request internally)
curl -X POST "http://localhost:4000/v1/pipelines/_dryrun" \
-u "<username>:<password>" \
-H "Content-Type: application/json" \
-d '{"pipeline_name": "my_pipeline", "data": "{\"message\": \"test log entry\"}"}'
-d '{"pipeline": "version: 2", "data": "{\"timestamp\": \"2024-05-25T20:16:37Z\"}", "data_type": "application/json"}'

# Delete pipeline
curl -X DELETE "http://localhost:4000/v1/pipelines/my_pipeline?version=<version>" \
Expand Down
28 changes: 22 additions & 6 deletions tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,18 +592,34 @@ async def test_create_pipeline_invalid_name():


@pytest.mark.asyncio
async def test_dryrun_pipeline_invalid_name():
"""Test dryrun_pipeline with invalid name"""
async def test_dryrun_pipeline_invalid_pipeline_name():
"""Test dryrun_pipeline with invalid pipeline name"""
with pytest.raises(ValueError) as excinfo:
await dryrun_pipeline(pipeline_name="123-invalid", data='{"message": "test"}')
assert "Invalid pipeline name" in str(excinfo.value)


@pytest.mark.asyncio
async def test_dryrun_pipeline_invalid_json():
"""Test dryrun_pipeline with invalid JSON data"""
result = await dryrun_pipeline(pipeline_name="test_pipeline", data="invalid json")
assert "Error: Invalid JSON data" in result
async def test_dryrun_pipeline_missing_data():
"""Test dryrun_pipeline without required data parameter"""
result = await dryrun_pipeline(pipeline="version: 2", data="")
assert "Error: data parameter is required" in result


@pytest.mark.asyncio
async def test_dryrun_pipeline_both_pipeline_and_name():
"""Test dryrun_pipeline with both pipeline and pipeline_name (should error)"""
result = await dryrun_pipeline(
pipeline="version: 2", pipeline_name="test_pipeline", data='{"message": "test"}'
)
assert "Error: Provide either 'pipeline' or 'pipeline_name', not both" in result


@pytest.mark.asyncio
async def test_dryrun_pipeline_neither_pipeline_nor_name():
"""Test dryrun_pipeline without pipeline or pipeline_name (should error)"""
result = await dryrun_pipeline(data='{"message": "test"}')
assert "Error: Provide either 'pipeline' or 'pipeline_name'" in result


@pytest.mark.asyncio
Expand Down
Loading