diff --git a/modelcontextprotocol/server.py b/modelcontextprotocol/server.py index a6789bd..a562580 100644 --- a/modelcontextprotocol/server.py +++ b/modelcontextprotocol/server.py @@ -16,6 +16,7 @@ CertificateStatus, UpdatableAsset, TermOperations, + get_asset_history, ) from pyatlan.model.lineage import LineageDirection from utils.parameters import ( @@ -718,6 +719,57 @@ def query_asset_tool( return query_asset(sql, connection_qualified_name, default_schema) +@mcp.tool() +def get_asset_history_tool( + guid=None, qualified_name=None, type_name=None, size=10, sort_order="DESC" +): + """ + Get the audit history of an asset by GUID or qualified name. + + Args: + guid (str, optional): GUID of the asset to get history for. + Either guid or qualified_name must be provided. + qualified_name (str, optional): Qualified name of the asset to get history for. + Either guid or qualified_name must be provided. + type_name (str, optional): Type name of the asset (required when using qualified_name). + Examples: "Table", "Column", "DbtModel", "AtlasGlossary" + size (int): Number of history entries to return. Defaults to 10. Maximum is 50. + Size validation is handled automatically by Pydantic. + sort_order (str): Sort order for results. "ASC" for oldest first, "DESC" for newest first. + Defaults to "DESC". Validation is handled automatically by Pydantic. + + Returns: + AssetHistoryResponse: Pydantic model containing: + - entity_audits: List of audit entries with details about each change + - count: Number of audit entries returned + - total_count: Total number of audit entries available + - errors: List of any validation or retrieval errors encountered + + Examples: + # Get history by GUID + history = get_asset_history_tool( + guid="6fc01478-1263-42ae-b8ca-c4a57da51392", + size=20, + sort_order="DESC", + ) + + # Get history by qualified name + history = get_asset_history_tool( + qualified_name="default/dbt/1755018137/account/258239/project/376530/model.simple_column_lineage.order_maths", + type_name="DbtModel", + size=15, + sort_order="ASC" + ) + """ + return get_asset_history( + guid=guid, + qualified_name=qualified_name, + type_name=type_name, + size=size, + sort_order=sort_order, + ) + + @mcp.tool() def create_glossaries(glossaries) -> List[Dict[str, Any]]: """ diff --git a/modelcontextprotocol/tools/__init__.py b/modelcontextprotocol/tools/__init__.py index f9d18f6..54f6b8c 100644 --- a/modelcontextprotocol/tools/__init__.py +++ b/modelcontextprotocol/tools/__init__.py @@ -1,8 +1,8 @@ from .search import search_assets from .dsl import get_assets_by_dsl from .lineage import traverse_lineage -from .assets import update_assets from .query import query_asset +from .assets import update_assets, get_asset_history from .glossary import ( create_glossary_category_assets, create_glossary_assets, @@ -16,6 +16,9 @@ Glossary, GlossaryCategory, GlossaryTerm, + AssetHistoryRequest, + AssetHistoryResponse, + AuditEntry, ) __all__ = [ @@ -24,6 +27,7 @@ "traverse_lineage", "update_assets", "query_asset", + "get_asset_history", "create_glossary_category_assets", "create_glossary_assets", "create_glossary_term_assets", @@ -34,4 +38,7 @@ "Glossary", "GlossaryCategory", "GlossaryTerm", + "AssetHistoryRequest", + "AssetHistoryResponse", + "AuditEntry", ] diff --git a/modelcontextprotocol/tools/assets.py b/modelcontextprotocol/tools/assets.py index e6abf18..efda393 100644 --- a/modelcontextprotocol/tools/assets.py +++ b/modelcontextprotocol/tools/assets.py @@ -1,5 +1,6 @@ import logging -from typing import List, Union, Dict, Any +from typing import List, Union, Dict, Any, Optional +from pydantic import ValidationError from client import get_atlan_client from .models import ( UpdatableAsset, @@ -7,9 +8,15 @@ CertificateStatus, TermOperation, TermOperations, + AssetHistoryRequest, + AssetHistoryResponse, ) from pyatlan.model.assets import Readme, AtlasGlossaryTerm from pyatlan.model.fluent_search import CompoundQuery, FluentSearch +from utils.asset_history import ( + create_audit_search_request, + process_audit_result, +) # Initialize logging logger = logging.getLogger(__name__) @@ -184,3 +191,91 @@ def update_assets( error_msg = f"Error updating assets: {str(e)}" logger.error(error_msg) return {"updated_count": 0, "errors": [error_msg]} + + +def get_asset_history( + guid: Optional[str] = None, + qualified_name: Optional[str] = None, + type_name: Optional[str] = None, + size: int = 10, + sort_order: str = "DESC", +) -> AssetHistoryResponse: + """ + Get the audit history of an asset by GUID or qualified name. + + Args: + guid (Optional[str]): GUID of the asset to get history for. + Either guid or qualified_name must be provided. + qualified_name (Optional[str]): Qualified name of the asset to get history for. + Either guid or qualified_name must be provided. + type_name (Optional[str]): Type name of the asset (required when using qualified_name). + Examples: "Table", "Column", "DbtModel", "AtlasGlossary" + size (int): Number of history entries to return. Defaults to 10. Maximum is 50. + sort_order (str): Sort order for results. "ASC" for oldest first, "DESC" for newest first. + Defaults to "DESC". + + Returns: + AssetHistoryResponse: Response containing: + - entity_audits: List of audit entries + - count: Number of audit entries returned + - total_count: Total number of audit entries available + - errors: List of any errors encountered + """ + try: + # Validate input parameters using Pydantic model + request_model = AssetHistoryRequest( + guid=guid, + qualified_name=qualified_name, + type_name=type_name, + size=size, + sort_order=sort_order, + ) + + logger.info( + f"Retrieving asset history with parameters: guid={request_model.guid}, " + f"qualified_name={request_model.qualified_name}, size={request_model.size}" + ) + + # Get Atlan client + client = get_atlan_client() + + # Create and execute audit search request + request = create_audit_search_request( + request_model.guid, + request_model.qualified_name, + request_model.type_name, + request_model.size, + request_model.sort_order, + ) + response = client.audit.search(criteria=request, bulk=False) + + # Process audit results - use current_page() to respect size parameter + entity_audits = [ + process_audit_result(result) for result in response.current_page() + ] + + logger.info( + f"Successfully retrieved {len(entity_audits)} audit entries for asset" + ) + + return AssetHistoryResponse( + entity_audits=entity_audits, + count=len(entity_audits), + total_count=response.total_count, + errors=[], + ) + + except ValidationError as e: + error_messages = [f"{err['loc'][0]}: {err['msg']}" for err in e.errors()] + error_msg = f"Validation error: {'; '.join(error_messages)}" + logger.error(error_msg) + return AssetHistoryResponse( + entity_audits=[], count=0, total_count=0, errors=error_messages + ) + except Exception as e: + error_msg = f"Error retrieving asset history: {str(e)}" + logger.error(error_msg) + logger.exception("Exception details:") + return AssetHistoryResponse( + entity_audits=[], count=0, total_count=0, errors=[error_msg] + ) diff --git a/modelcontextprotocol/tools/models.py b/modelcontextprotocol/tools/models.py index 95bd048..8c5af82 100644 --- a/modelcontextprotocol/tools/models.py +++ b/modelcontextprotocol/tools/models.py @@ -1,7 +1,7 @@ from enum import Enum from typing import Optional, List -from pydantic import BaseModel +from pydantic import BaseModel, field_validator, model_validator class CertificateStatus(str, Enum): @@ -73,3 +73,66 @@ class GlossaryTerm(BaseModel): user_description: Optional[str] = None certificate_status: Optional[CertificateStatus] = None category_guids: Optional[List[str]] = None + + +class AssetHistoryRequest(BaseModel): + """Request model for asset history retrieval.""" + + guid: Optional[str] = None + qualified_name: Optional[str] = None + type_name: Optional[str] = None + size: int = 10 + sort_order: str = "DESC" + + @model_validator(mode="after") + def validate_asset_identifier(self) -> "AssetHistoryRequest": + """Validate that either guid or qualified_name is provided.""" + if not self.guid and not self.qualified_name: + raise ValueError("Either guid or qualified_name must be provided") + + if self.qualified_name and not self.type_name: + raise ValueError("type_name is required when using qualified_name") + + return self + + @field_validator("sort_order") + @classmethod + def validate_sort_order(cls, v: str) -> str: + """Validate sort order is either ASC or DESC.""" + if v not in ["ASC", "DESC"]: + raise ValueError("sort_order must be either 'ASC' or 'DESC'") + return v + + @field_validator("size") + @classmethod + def validate_size(cls, v: int) -> int: + """Validate size is positive and within limits.""" + if v <= 0: + raise ValueError("size must be greater than 0") + if v > 50: + raise ValueError("size cannot exceed 50") + return v + + +class AuditEntry(BaseModel): + """Model for a single audit entry.""" + + guid: Optional[str] = None + timestamp: Optional[int] = None + action: Optional[str] = None + user: Optional[str] = None + detail: Optional[dict] = None + + class Config: + """Pydantic config.""" + + extra = "allow" # Allow additional fields from audit detail + + +class AssetHistoryResponse(BaseModel): + """Response model for asset history.""" + + entity_audits: List[AuditEntry] + count: int + total_count: int + errors: List[str] = [] diff --git a/modelcontextprotocol/utils/asset_history.py b/modelcontextprotocol/utils/asset_history.py new file mode 100644 index 0000000..5218b78 --- /dev/null +++ b/modelcontextprotocol/utils/asset_history.py @@ -0,0 +1,93 @@ +""" +Utility functions for asset history operations. + +This module contains helper functions for retrieving and processing asset audit history. +""" + +import logging +from typing import Optional +from pyatlan.client.audit import AuditSearchRequest +from pyatlan.model.search import DSL, Bool, SortItem, Term +from pyatlan.model.enums import SortOrder + +# Initialize logging +logger = logging.getLogger(__name__) + + +def create_audit_search_request( + guid: Optional[str], + qualified_name: Optional[str], + type_name: Optional[str], + size: int, + sort_order: str, +) -> AuditSearchRequest: + """ + Create an AuditSearchRequest based on the provided parameters. + + Args: + guid: Asset GUID + qualified_name: Asset qualified name + type_name: Asset type name + size: Number of results to return + sort_order: Sort order ("ASC" or "DESC") + + Returns: + Configured AuditSearchRequest + """ + # Create sort item inline + sort_item = SortItem( + "created", + order=SortOrder.DESCENDING if sort_order == "DESC" else SortOrder.ASCENDING, + ) + + if guid: + dsl = DSL( + query=Bool(filter=[Term(field="entityId", value=guid)]), + sort=[sort_item], + size=size, + ) + logger.debug(f"Created audit search request by GUID: {guid}") + else: + dsl = DSL( + query=Bool( + must=[ + Term(field="entityQualifiedName", value=qualified_name), + Term(field="typeName", value=type_name), + ] + ), + sort=[sort_item], + size=size, + ) + logger.debug( + f"Created audit search request by qualified name: {qualified_name}" + ) + + return AuditSearchRequest(dsl=dsl) + + +def process_audit_result(result): + """ + Process a single audit result into AuditEntry data. + + Args: + result: Audit result object + + Returns: + Dictionary with audit entry data for AuditEntry model + """ + # Import here to avoid circular dependency + from tools.models import AuditEntry + + # Extract basic audit information + audit_data = { + "guid": getattr(result, "entity_id", None), + "timestamp": getattr(result, "timestamp", None), + "action": getattr(result, "action", None), + "user": getattr(result, "user", None), + } + + # Add detail updates as additional fields + detail_updates = result.detail.dict(exclude_unset=True) + audit_data.update(detail_updates) + + return AuditEntry(**audit_data)