Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ jobs:
- uses: actions/checkout@v4
- name: Install the latest version of uv
uses: astral-sh/setup-uv@v5
- run: uv python pin cp311
- name: Install dependencies
run: uv sync
- name: Format
Expand Down
77 changes: 54 additions & 23 deletions src/greptimedb_mcp_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ async def execute_tql(
],
end: Annotated[
str,
"End time: SQL expression (e.g., 'now()'), " "RFC3339, or Unix timestamp",
"End time: SQL expression (e.g., 'now()'), RFC3339, or Unix timestamp",
],
step: Annotated[str, "Query resolution step, e.g., '1m', '5m', '1h'"],
lookback: Annotated[str | None, "Lookback delta for range queries"] = None,
Expand Down Expand Up @@ -722,16 +722,14 @@ async def create_pipeline(
pipelines = result.get("pipelines", [])
version = pipelines[0]["version"] if pipelines else "unknown"
return (
f"Pipeline '{name}' created successfully.\n"
f"Version: {version}"
f"Pipeline '{name}' created successfully.\nVersion: {version}"
)
except (json.JSONDecodeError, KeyError, IndexError):
return f"Pipeline '{name}' created successfully."
else:
error_detail = response_text if response_text else "No details"
return (
f"Error creating pipeline (HTTP {response.status}): "
f"{error_detail}"
f"Error creating pipeline (HTTP {response.status}): {error_detail}"
)

except aiohttp.ClientError as e:
Expand All @@ -741,24 +739,59 @@ async def create_pipeline(

@mcp.tool()
async def dryrun_pipeline(
pipeline_name: Annotated[str, "Name of the pipeline to test"],
data: Annotated[str, "Test data in JSON format (single object or array)"],
pipeline: Annotated[
str | None,
"Pipeline configuration in YAML format (inline). Provide this to test a pipeline without saving it.",
] = None,
pipeline_name: Annotated[
str | None,
"Name of the saved pipeline to test. Provide either 'pipeline' or 'pipeline_name', not both.",
] = None,
data: Annotated[
str, "Test data in JSON or NDJSON format (single object or array)"
] = "",
data_type: Annotated[
str | None,
"Content type of the data (e.g., 'application/x-ndjson'). If omitted, GreptimeDB will use default.",
] = None,
) -> str:
"""Test a pipeline with sample data without writing to the database."""
"""Test a pipeline with sample data without writing to the database.

You can test a pipeline in two ways:
- Provide 'pipeline' with inline YAML configuration
- Provide 'pipeline_name' to test a previously saved pipeline

Args:
pipeline: Pipeline YAML configuration (inline)
pipeline_name: Name of saved pipeline (mutually exclusive with pipeline)
data: Test data in JSON/NDJSON format
data_type: Optional content type (e.g., 'application/x-ndjson')
"""
state = get_state()
pipeline_name = _validate_pipeline_name(pipeline_name)

try:
parsed = json.loads(data)
normalized_data = json.dumps(parsed, ensure_ascii=False)
except json.JSONDecodeError as e:
return f"Error: Invalid JSON data: {str(e)}"
if not data or not data.strip():
return "Error: data parameter is required"

if pipeline is not None and pipeline_name is not None:
return "Error: Provide either 'pipeline' or 'pipeline_name', not both"

if pipeline is None and pipeline_name is None:
return "Error: Provide either 'pipeline' or 'pipeline_name'"

if pipeline_name is not None:
pipeline_name = _validate_pipeline_name(pipeline_name)

url = f"{state.http_base_url}/v1/pipelines/_dryrun"
request_body = {
"pipeline_name": pipeline_name,
"data": normalized_data,
}
request_body = {"data": data}

if data_type:
request_body["data_type"] = data_type

if pipeline is not None:
request_body["pipeline"] = pipeline
elif pipeline_name is not None:
request_body["pipeline_name"] = pipeline_name

auth = state.get_http_auth()
logger.debug(f"Dryrun request URL: {url}")
logger.debug(f"Dryrun request body: {request_body}")
Expand All @@ -780,12 +813,11 @@ async def dryrun_pipeline(
else:
error_detail = response_text if response_text else "No details"
return (
f"Error testing pipeline (HTTP {response.status}): "
f"{error_detail}"
f"Error testing pipeline (HTTP {response.status}): {error_detail}"
)

except aiohttp.ClientError as e:
logger.error(f"HTTP error testing pipeline '{pipeline_name}': {e}")
logger.error(f"HTTP error testing pipeline: {e}")
return f"Error testing pipeline: {str(e)}"


Expand Down Expand Up @@ -813,8 +845,7 @@ async def delete_pipeline(
else:
error_detail = response_text if response_text else "No details"
return (
f"Error deleting pipeline (HTTP {response.status}): "
f"{error_detail}"
f"Error deleting pipeline (HTTP {response.status}): {error_detail}"
)

except aiohttp.ClientError as e:
Expand Down
47 changes: 30 additions & 17 deletions src/greptimedb_mcp_server/templates/pipeline_creator/template.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,31 +132,42 @@ transform:

Generate a complete, valid YAML pipeline configuration. After generation:
1. Use `create_pipeline` tool to create the pipeline
2. Use `dryrun_pipeline` tool to verify with sample data
2. Use `dryrun_pipeline` tool with inline pipeline YAML to verify with sample data

**Note**: You can update an existing pipeline by calling `create_pipeline` with the same name. Each call creates a new version. Use `list_pipelines` to view all versions, and `delete_pipeline` to remove specific versions.

## Testing with dryrun_pipeline

The `dryrun_pipeline` tool accepts JSON data in the following formats:
Use `dryrun_pipeline` with separated parameters:

**Single log entry (JSON object with "message" field for plain text logs):**
```json
{"message": "127.0.0.1 - - [25/May/2024:20:16:37 +0000] \"GET /index.html HTTP/1.1\" 200 612"}
**Example with inline pipeline YAML:**
```python
dryrun_pipeline(
pipeline='''version: 2
processors:
- date:
fields:
- timestamp
formats:
- '%Y-%m-%dT%H:%M:%SZ' ''',
data='{"timestamp": "2024-05-25T20:16:37Z", "level": "INFO"}',
data_type='application/json'
)
```

**Multiple log entries (JSON array):**
```json
[
{"message": "127.0.0.1 - - [25/May/2024:20:16:37 +0000] \"GET /index.html HTTP/1.1\" 200 612"},
{"message": "192.168.1.1 - - [25/May/2024:20:17:37 +0000] \"POST /api/login HTTP/1.1\" 200 1784"}
]
**Example with saved pipeline:**
```python
dryrun_pipeline(
pipeline_name='my_log_pipeline',
data='{"message": "127.0.0.1 - - [25/May/2024:20:16:37 +0000]"}',
data_type='application/x-ndjson'
)
```

**Structured JSON logs (fields map directly to pipeline input):**
```json
{"timestamp": "2024-05-25 20:16:37", "level": "INFO", "service": "api", "message": "Request processed"}
```
**Data Formats:**
- **Single log entry:** `{"message": "127.0.0.1 - - [25/May/2024:20:16:37 +0000]"}`
- **Multiple entries (JSON array):** `[{"message": "log1"}, {"message": "log2"}]`
- **NDJSON (newline-delimited):** Use `data_type='application/x-ndjson'` with data like `"{"msg":"line1"}\n{"msg":"line2"}"`

## Common Log Format Examples

Expand Down Expand Up @@ -213,6 +224,8 @@ Pattern: `%{timestamp} %{hostname} %{app}[%{pid}]: %{message}`
## Troubleshooting

If `dryrun_pipeline` fails:
- **Missing required parameters**: Ensure you provide `data` and exactly one of `pipeline` or `pipeline_name`
- **Both pipeline and pipeline_name provided**: Only provide one of them
- **Pattern mismatch**: Check if dissect/regex pattern matches the log format exactly
- **Date format error**: Verify the date format string matches the timestamp in logs
- **Missing fields**: Use `ignore_missing: true` in processors to handle optional fields
Expand All @@ -230,11 +243,11 @@ curl -X POST "http://localhost:4000/v1/pipelines/my_pipeline" \
-H "Content-Type: application/x-yaml" \
-d @pipeline.yaml

# Dryrun pipeline
# Dryrun pipeline (constructs JSON request internally)
curl -X POST "http://localhost:4000/v1/pipelines/_dryrun" \
-u "<username>:<password>" \
-H "Content-Type: application/json" \
-d '{"pipeline_name": "my_pipeline", "data": "{\"message\": \"test log entry\"}"}'
-d '{"pipeline": "version: 2", "data": "{\"timestamp\": \"2024-05-25T20:16:37Z\"}", "data_type": "application/json"}'

# Delete pipeline
curl -X DELETE "http://localhost:4000/v1/pipelines/my_pipeline?version=<version>" \
Expand Down
55 changes: 49 additions & 6 deletions tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,18 +592,61 @@ async def test_create_pipeline_invalid_name():


@pytest.mark.asyncio
async def test_dryrun_pipeline_invalid_name():
"""Test dryrun_pipeline with invalid name"""
async def test_dryrun_pipeline_inline():
"""Test dryrun_pipeline with inline pipeline YAML"""
result = await dryrun_pipeline(pipeline="version: 2", data='{"message": "test"}')
assert "Error testing pipeline" in result or "pipelines" in result.lower()


@pytest.mark.asyncio
async def test_dryrun_pipeline_with_pipeline_name():
"""Test dryrun_pipeline with saved pipeline name"""
result = await dryrun_pipeline(
pipeline_name="test_pipeline", data='{"message": "test"}'
)
assert "Error testing pipeline" in result or "pipelines" in result.lower()


@pytest.mark.asyncio
async def test_dryrun_pipeline_invalid_pipeline_name():
"""Test dryrun_pipeline with invalid pipeline name"""
with pytest.raises(ValueError) as excinfo:
await dryrun_pipeline(pipeline_name="123-invalid", data='{"message": "test"}')
assert "Invalid pipeline name" in str(excinfo.value)


@pytest.mark.asyncio
async def test_dryrun_pipeline_invalid_json():
"""Test dryrun_pipeline with invalid JSON data"""
result = await dryrun_pipeline(pipeline_name="test_pipeline", data="invalid json")
assert "Error: Invalid JSON data" in result
async def test_dryrun_pipeline_with_data_type():
"""Test dryrun_pipeline with data_type parameter"""
result = await dryrun_pipeline(
pipeline="version: 2",
data='{"message": "test"}',
data_type="application/x-ndjson",
)
assert "Error testing pipeline" in result or "pipelines" in result.lower()


@pytest.mark.asyncio
async def test_dryrun_pipeline_missing_data():
"""Test dryrun_pipeline without required data parameter"""
result = await dryrun_pipeline(pipeline="version: 2", data="")
assert "Error: data parameter is required" in result


@pytest.mark.asyncio
async def test_dryrun_pipeline_both_pipeline_and_name():
"""Test dryrun_pipeline with both pipeline and pipeline_name (should error)"""
result = await dryrun_pipeline(
pipeline="version: 2", pipeline_name="test_pipeline", data='{"message": "test"}'
)
assert "Error: Provide either 'pipeline' or 'pipeline_name', not both" in result


@pytest.mark.asyncio
async def test_dryrun_pipeline_neither_pipeline_nor_name():
"""Test dryrun_pipeline without pipeline or pipeline_name (should error)"""
result = await dryrun_pipeline(data='{"message": "test"}')
assert "Error: Provide either 'pipeline' or 'pipeline_name'" in result


@pytest.mark.asyncio
Expand Down
Loading