diff --git a/modelcontextprotocol/README.md b/modelcontextprotocol/README.md index af753b4..2ce7a32 100644 --- a/modelcontextprotocol/README.md +++ b/modelcontextprotocol/README.md @@ -146,6 +146,7 @@ Open `Cursor > Settings > Tools & Integrations > New MCP Server` to include the | `create_glossaries` | Create glossaries | | `create_glossary_categories` | Create glossary categories | | `create_glossary_terms` | Create glossary terms | +| `query_asset` | Execute SQL queries on table/view assets | ## Production Deployment diff --git a/modelcontextprotocol/server.py b/modelcontextprotocol/server.py index 080e9b9..67b0679 100644 --- a/modelcontextprotocol/server.py +++ b/modelcontextprotocol/server.py @@ -7,6 +7,7 @@ get_assets_by_dsl, traverse_lineage, update_assets, + query_asset, create_glossary_category_assets, create_glossary_assets, create_glossary_term_assets, @@ -492,6 +493,78 @@ def update_assets_tool( } +@mcp.tool() +def query_asset_tool( + sql: str, connection_qualified_name: str, default_schema: str | None = None +): + """ + Execute a SQL query on a table/view asset. + + This tool enables querying table/view assets on the source similar to + what's available in the insights table. It uses the Atlan query capabilities + to execute SQL against connected data sources. + + CRITICAL: Use READ-ONLY queries to retrieve data. Write and modify queries are not supported by this tool. + + + Args: + sql (str): The SQL query to execute (read-only queries allowed) + connection_qualified_name (str): Connection qualified name to use for the query. + This is the same parameter used in search_assets_tool. + You can find this value by searching for Table/View assets using search_assets_tool + and extracting the first part of the 'qualifiedName' attribute. + Example: from "default/snowflake/1657275059/LANDING/FRONTEND_PROD/PAGES" + use "default/snowflake/1657275059" + default_schema (str, optional): Default schema name to use for unqualified + objects in the SQL, in the form "DB.SCHEMA" + (e.g., "RAW.WIDEWORLDIMPORTERS_WAREHOUSE") + + Examples: + # Use case: How to query the PAGES table and retrieve the first 10 rows + # Find tables to query using search_assets_tool + tables = search_assets_tool( + asset_type="Table", + conditions={"name": "PAGES"}, + limit=5 + ) + # Extract connection info from the table's qualifiedName + # Example qualifiedName: "default/snowflake/1657275059/LANDING/FRONTEND_PROD/PAGES" + # connection_qualified_name: "default/snowflake/1657275059" + # database.schema: "LANDING.FRONTEND_PROD" + + # Query the table using extracted connection info + result = query_asset_tool( + sql='SELECT * FROM PAGES LIMIT 10', + connection_qualified_name="default/snowflake/1657275059", + default_schema="LANDING.FRONTEND_PROD" + ) + + # Query without specifying default schema (fully qualified table names) + result = query_asset_tool( + sql='SELECT COUNT(*) FROM "LANDING"."FRONTEND_PROD"."PAGES"', + connection_qualified_name="default/snowflake/1657275059" + ) + + # Complex analytical query on PAGES table + result = query_asset_tool( + sql=''' + SELECT + page_type, + COUNT(*) AS page_count, + AVG(load_time) AS avg_load_time, + MAX(views) AS max_views + FROM PAGES + WHERE created_date >= '2024-01-01' + GROUP BY page_type + ORDER BY page_count DESC + ''', + connection_qualified_name="default/snowflake/1657275059", + default_schema="LANDING.FRONTEND_PROD" + ) + """ + return query_asset(sql, connection_qualified_name, default_schema) + + @mcp.tool() def create_glossaries(glossaries) -> List[Dict[str, Any]]: """ diff --git a/modelcontextprotocol/tools/__init__.py b/modelcontextprotocol/tools/__init__.py index 5e057c5..851b0d5 100644 --- a/modelcontextprotocol/tools/__init__.py +++ b/modelcontextprotocol/tools/__init__.py @@ -2,6 +2,7 @@ from .dsl import get_assets_by_dsl from .lineage import traverse_lineage from .assets import update_assets +from .query import query_asset from .glossary import ( create_glossary_category_assets, create_glossary_assets, @@ -21,6 +22,7 @@ "get_assets_by_dsl", "traverse_lineage", "update_assets", + "query_asset", "create_glossary_category_assets", "create_glossary_assets", "create_glossary_term_assets", diff --git a/modelcontextprotocol/tools/query.py b/modelcontextprotocol/tools/query.py new file mode 100644 index 0000000..951c9cd --- /dev/null +++ b/modelcontextprotocol/tools/query.py @@ -0,0 +1,121 @@ +""" +Query tool for executing SQL queries on table/view assets. + +This module provides functionality to execute SQL queries on data sources +using the Atlan client. +""" + +import logging +from typing import Dict, Any, Optional + +from client import get_atlan_client +from pyatlan.model.query import QueryRequest + +# Configure logging +logger = logging.getLogger(__name__) + + +def query_asset( + sql: str, + connection_qualified_name: str, + default_schema: Optional[str] = None, +) -> Dict[str, Any]: + """ + Execute a SQL query on a table/view asset. + + Note: + Use read-only queries to retrieve data. + Please add reasonable LIMIT clauses to your SQL queries to avoid + overwhelming the client or causing timeouts. Large result sets can + cause performance issues or crash the client application. + + Args: + sql (str): The SQL query to execute (read-only queries) + connection_qualified_name (str): Connection qualified name to use for the query + (e.g., "default/snowflake/1705755637") + default_schema (str, optional): Default schema name to use for unqualified + objects in the SQL, in the form "DB.SCHEMA" + (e.g., "RAW.WIDEWORLDIMPORTERS_WAREHOUSE") + + Returns: + Dict[str, Any]: Dictionary containing: + - success: Boolean indicating if the query was successful + - data: Query result data (rows, columns) if successful + - error: Error message if query failed + - query_info: Additional query execution information + + Raises: + Exception: If there's an error executing the query + """ + logger.info( + f"Starting SQL query execution on connection: {connection_qualified_name}" + ) + logger.debug(f"SQL query: {sql}") + logger.debug(f"Parameters - default_schema: {default_schema}") + + try: + # Validate required parameters + if not sql or not sql.strip(): + error_msg = "SQL query cannot be empty" + logger.error(error_msg) + return { + "success": False, + "data": None, + "error": error_msg, + "query_info": {}, + } + + if not connection_qualified_name or not connection_qualified_name.strip(): + error_msg = "Connection qualified name cannot be empty" + logger.error(error_msg) + return { + "success": False, + "data": None, + "error": error_msg, + "query_info": {}, + } + + # Get Atlan client + logger.debug("Getting Atlan client") + client = get_atlan_client() + + # Build query request + logger.debug("Building QueryRequest object") + query_request = QueryRequest( + sql=sql, + data_source_name=connection_qualified_name, + default_schema=default_schema, + ) + + # Execute query + logger.info("Executing SQL query") + query_response = client.queries.stream(request=query_request) + + logger.info("Query executed successfully, returning response") + + return { + "success": True, + "data": query_response, + "error": None, + "query_info": { + "data_source": connection_qualified_name, + "default_schema": default_schema, + "sql": sql, + }, + } + + except Exception as e: + error_msg = f"Error executing SQL query: {str(e)}" + logger.error(error_msg) + logger.exception("Exception details:") + + return { + "success": False, + "data": None, + "error": error_msg, + "query_info": { + "data_source": connection_qualified_name, + "default_schema": default_schema, + "sql": sql, + }, + }