Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions modelcontextprotocol/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
get_assets_by_dsl,
traverse_lineage,
update_assets,
query_asset,
UpdatableAttribute,
CertificateStatus,
UpdatableAsset,
Expand Down Expand Up @@ -457,6 +458,86 @@ def update_assets_tool(
}


@mcp.tool()
def query_asset_tool(
sql: str,
connection_qualified_name: str,
default_schema: str = None,
):
"""
Execute a SQL query on a table/view asset.

This tool enables querying table/view assets on the source similar to
what's available in the insights table. It uses the Atlan query capabilities
to execute SQL against connected data sources.

Note:
Use read-only queries to retrieve data.

Args:
sql (str): The SQL query to execute (read-only queries allowed)
connection_qualified_name (str): Connection qualified name to use for the query.
This is the same parameter used in search_assets_tool.
You can find this value by searching for Table/View assets using search_assets_tool
and extracting the first part of the 'qualifiedName' attribute.
Example: from "default/snowflake/1657275059/LANDING/FRONTEND_PROD/PAGES"
use "default/snowflake/1657275059"
default_schema (str, optional): Default schema name to use for unqualified
objects in the SQL, in the form "DB.SCHEMA"
(e.g., "RAW.WIDEWORLDIMPORTERS_WAREHOUSE")

Returns:
Dict[str, Any]: Dictionary containing:
- success: Boolean indicating if the query was successful
- data: Query result data (rows, columns) if successful
- error: Error message if query failed
- query_info: Additional query execution information

Examples:
# Find tables to query using search_assets_tool
tables = search_assets_tool(
asset_type="Table",
conditions={"name": "PAGES"},
limit=5
)
# Extract connection info from the table's qualifiedName
# Example qualifiedName: "default/snowflake/1657275059/LANDING/FRONTEND_PROD/PAGES"
# connection_qualified_name: "default/snowflake/1657275059"
# database.schema: "LANDING.FRONTEND_PROD"

# Query the table using extracted connection info
result = query_asset_tool(
sql='SELECT * FROM PAGES LIMIT 10',
connection_qualified_name="default/snowflake/1657275059",
default_schema="LANDING.FRONTEND_PROD"
)

# Query without specifying default schema (fully qualified table names)
result = query_asset_tool(
sql='SELECT COUNT(*) FROM "RAW"."WIDEWORLDIMPORTERS_WAREHOUSE"."ORDERS"',
connection_qualified_name="default/postgres/connection123"
)

# Complex analytical query
result = query_asset_tool(
sql='''
SELECT
category,
COUNT(*) AS product_count,
AVG(price) AS avg_price,
MAX(price) AS max_price
FROM products
WHERE created_date >= '2024-01-01'
GROUP BY category
ORDER BY product_count DESC
''',
connection_qualified_name="default/snowflake/analytics_db",
default_schema="ANALYTICS.PRODUCTS"
)
"""
return query_asset(sql, connection_qualified_name, default_schema)


def main():
mcp.run()

Expand Down
2 changes: 2 additions & 0 deletions modelcontextprotocol/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
from .dsl import get_assets_by_dsl
from .lineage import traverse_lineage
from .assets import update_assets
from .query import query_asset
from .models import CertificateStatus, UpdatableAttribute, UpdatableAsset

__all__ = [
"search_assets",
"get_assets_by_dsl",
"traverse_lineage",
"update_assets",
"query_asset",
"CertificateStatus",
"UpdatableAttribute",
"UpdatableAsset",
Expand Down
124 changes: 124 additions & 0 deletions modelcontextprotocol/tools/query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
"""
Query tool for executing SQL queries on table/view assets.

This module provides functionality to execute SQL queries on data sources
using the Atlan client.
"""

import logging
from typing import Dict, Any, Optional

from client import get_atlan_client
from pyatlan.model.query import QueryRequest

# Configure logging
logger = logging.getLogger(__name__)


def query_asset(
sql: str,
connection_qualified_name: str,
default_schema: Optional[str] = None,
) -> Dict[str, Any]:
"""
Execute a SQL query on a table/view asset.

Note:
Use read-only queries to retrieve data.
Please add reasonable LIMIT clauses to your SQL queries to avoid
overwhelming the client or causing timeouts. Large result sets can
cause performance issues or crash the client application.

Args:
sql (str): The SQL query to execute (read-only queries)
connection_qualified_name (str): Connection qualified name to use for the query
(e.g., "default/snowflake/1705755637")
default_schema (str, optional): Default schema name to use for unqualified
objects in the SQL, in the form "DB.SCHEMA"
(e.g., "RAW.WIDEWORLDIMPORTERS_WAREHOUSE")

Returns:
Dict[str, Any]: Dictionary containing:
- success: Boolean indicating if the query was successful
- data: Query result data (rows, columns) if successful
- error: Error message if query failed
- query_info: Additional query execution information

Raises:
Exception: If there's an error executing the query
"""
logger.info(
f"Starting SQL query execution on connection: {connection_qualified_name}"
)
logger.debug(f"SQL query: {sql}")
logger.debug(f"Parameters - default_schema: {default_schema}")

try:
# Validate required parameters
if not sql or not sql.strip():
error_msg = "SQL query cannot be empty"
logger.error(error_msg)
return {
"success": False,
"data": None,
"error": error_msg,
"query_info": {}
}

if not connection_qualified_name or not connection_qualified_name.strip():
error_msg = "Connection qualified name cannot be empty"
logger.error(error_msg)
return {
"success": False,
"data": None,
"error": error_msg,
"query_info": {}
}


# Get Atlan client
logger.debug("Getting Atlan client")
client = get_atlan_client()

# Build query request
logger.debug("Building QueryRequest object")
query_request = QueryRequest(
sql=sql,
data_source_name=connection_qualified_name,
default_schema=default_schema
)

# Execute query
logger.info("Executing SQL query")
query_response = client.queries.stream(request=query_request)

logger.info(
f"Query executed successfully, returning response"
)

return {
"success": True,
"data": query_response,
"error": None,
"query_info": {
"data_source": connection_qualified_name,
"default_schema": default_schema,
"sql": sql
}
}

except Exception as e:
error_msg = f"Error executing SQL query: {str(e)}"
logger.error(error_msg)
logger.exception("Exception details:")

return {
"success": False,
"data": None,
"error": error_msg,
"query_info": {
"data_source": connection_qualified_name,
"default_schema": default_schema,
"sql": sql
}
}