Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions modelcontextprotocol/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
CertificateStatus,
UpdatableAsset,
TermOperations,
get_asset_history,
)
from pyatlan.model.lineage import LineageDirection
from utils.parameters import (
Expand Down Expand Up @@ -718,6 +719,56 @@ def query_asset_tool(
return query_asset(sql, connection_qualified_name, default_schema)


@mcp.tool()
def get_asset_history_tool(
guid=None, qualified_name=None, type_name=None, size=10, sort_order="DESC"
):
"""
Get the audit history of an asset by GUID or qualified name.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add information about how does the LLM fetch the guid or qualified name of the asset. Also this should be Get the audit history of an asset by GUID or qualified name/typename.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also do we need to support qualifiedname/typename or just guid should be enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe guid should be enough because there would be a very rare case when customer directly provides qualifiedname/typename and even if they do, claude can make search call to find the guid.

I will remove this.


Args:
guid (str, optional): GUID of the asset to get history for.
Either guid or qualified_name must be provided.
qualified_name (str, optional): Qualified name of the asset to get history for.
Either guid or qualified_name must be provided.
type_name (str, optional): Type name of the asset (required when using qualified_name).
Examples: "Table", "Column", "DbtModel", "AtlasGlossary"
size (int): Number of history entries to return. Defaults to 10.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a max size value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

sort_order (str): Sort order for results. "ASC" for oldest first, "DESC" for newest first.
Defaults to "DESC".

Returns:
Dict[str, Any]: Dictionary containing:
- entityAudits: List of audit entries with details about each change
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we follow a proper pattern. The argument names are snake case but dictionary keys are camelcase

- count: Number of audit entries returned
- totalCount: Total number of audit entries available
- errors: List of any errors encountered

Examples:
# Get history by GUID
history = get_asset_history_tool(
guid="6fc01478-1263-42ae-b8ca-c4a57da51392",
size=20,
sort_order="DESC",
)

# Get history by qualified name
history = get_asset_history_tool(
qualified_name="default/dbt/1755018137/account/258239/project/376530/model.simple_column_lineage.order_maths",
type_name="DbtModel",
size=15,
sort_order="ASC"
)
"""
return get_asset_history(
guid=guid,
qualified_name=qualified_name,
type_name=type_name,
size=size,
sort_order=sort_order,
)


@mcp.tool()
def create_glossaries(glossaries) -> List[Dict[str, Any]]:
"""
Expand Down
3 changes: 2 additions & 1 deletion modelcontextprotocol/tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from .search import search_assets
from .dsl import get_assets_by_dsl
from .lineage import traverse_lineage
from .assets import update_assets
from .query import query_asset
from .assets import update_assets, get_asset_history
from .glossary import (
create_glossary_category_assets,
create_glossary_assets,
Expand All @@ -24,6 +24,7 @@
"traverse_lineage",
"update_assets",
"query_asset",
"get_asset_history",
"create_glossary_category_assets",
"create_glossary_assets",
"create_glossary_term_assets",
Expand Down
93 changes: 92 additions & 1 deletion modelcontextprotocol/tools/assets.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import List, Union, Dict, Any
from typing import List, Union, Dict, Any, Optional
from client import get_atlan_client
from .models import (
UpdatableAsset,
Expand All @@ -10,6 +10,12 @@
)
from pyatlan.model.assets import Readme, AtlasGlossaryTerm
from pyatlan.model.fluent_search import CompoundQuery, FluentSearch
from utils.asset_history import (
validate_asset_history_params,
create_audit_search_request,
process_audit_result,
create_sort_item,
)

# Initialize logging
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -184,3 +190,88 @@ def update_assets(
error_msg = f"Error updating assets: {str(e)}"
logger.error(error_msg)
return {"updated_count": 0, "errors": [error_msg]}


def get_asset_history(
guid: Optional[str] = None,
qualified_name: Optional[str] = None,
type_name: Optional[str] = None,
size: int = 10,
sort_order: str = "DESC",
) -> Dict[str, Any]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use pYdantic objects return rather than dicts

"""
Get the audit history of an asset by GUID or qualified name.

Args:
guid (Optional[str]): GUID of the asset to get history for.
Either guid or qualified_name must be provided.
qualified_name (Optional[str]): Qualified name of the asset to get history for.
Either guid or qualified_name must be provided.
type_name (Optional[str]): Type name of the asset (required when using qualified_name).
Examples: "Table", "Column", "DbtModel", "AtlasGlossary"
size (int): Number of history entries to return. Defaults to 10.
sort_order (str): Sort order for results. "ASC" for oldest first, "DESC" for newest first.
Defaults to "DESC".

Returns:
Dict[str, Any]: Dictionary containing:
- entityAudits: List of audit entries
- count: Number of audit entries returned
- totalCount: Total number of audit entries available
- errors: List of any errors encountered

Raises:
Exception: If there's an error retrieving the asset history
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DOes it raise exceptions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in cases when API key does not have enough permissions.

"""
try:
# Validate input parameters
validation_error = validate_asset_history_params(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use pydantic validations. We won't have to write validation rules

guid, qualified_name, type_name, sort_order
)
if validation_error:
logger.error(validation_error)
return {
"errors": [validation_error],
"entityAudits": [],
"count": 0,
"totalCount": 0,
}

logger.info(
f"Retrieving asset history with parameters: guid={guid}, qualified_name={qualified_name}, size={size}"
)

# Get Atlan client
client = get_atlan_client()

# Create sort item
sort_item = create_sort_item(sort_order)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need separate functions for these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, I will remove this.


# Create and execute audit search request
request = create_audit_search_request(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for a separate function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noted, will remove.

guid, qualified_name, type_name, size, sort_item
)
response = client.audit.search(criteria=request, bulk=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should catch exceptions and return them as errors since you have defined the tool in that way

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exception handling is properly implemented here, it is in try catch block.


# Process audit results - use current_page() to respect size parameter
entity_audits = [
process_audit_result(result) for result in response.current_page()
]

result_data = {
"entityAudits": entity_audits,
"count": len(entity_audits),
"totalCount": response.total_count,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the difference between count and totalCount?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Count is the number of audit entries that are now being sent by the MCP, which should be actually equal to the number of entries requested by the user in the request. And total_count is the total number of entries that can be obtained from Atlas.

"errors": [],
}

logger.info(
f"Successfully retrieved {len(entity_audits)} audit entries for asset"
)
return result_data

except Exception as e:
error_msg = f"Error retrieving asset history: {str(e)}"
logger.error(error_msg)
logger.exception("Exception details:")
return {"errors": [error_msg], "entityAudits": [], "count": 0, "totalCount": 0}
153 changes: 153 additions & 0 deletions modelcontextprotocol/utils/asset_history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
"""
Utility functions for asset history operations.
This module contains helper functions for retrieving and processing asset audit history.
"""

import logging
from typing import Dict, Any, Optional
from pyatlan.client.audit import AuditSearchRequest
from pyatlan.model.search import DSL, Bool, SortItem, Term
from pyatlan.model.enums import SortOrder

# Initialize logging
logger = logging.getLogger(__name__)


def validate_asset_history_params(
guid: Optional[str],
qualified_name: Optional[str],
type_name: Optional[str],
sort_order: str,
) -> Optional[str]:
"""
Validate input parameters for asset history retrieval.
Args:
guid: Asset GUID
qualified_name: Asset qualified name
type_name: Asset type name
sort_order: Sort order
Returns:
Error message if validation fails, None if valid
"""
if not guid and not qualified_name:
return "Either guid or qualified_name must be provided"

if qualified_name and not type_name:
return "type_name is required when using qualified_name"

if sort_order not in ["ASC", "DESC"]:
return "sort_order must be either 'ASC' or 'DESC'"

return None


def create_audit_search_request(
guid: Optional[str],
qualified_name: Optional[str],
type_name: Optional[str],
size: int,
sort_item: SortItem,
) -> AuditSearchRequest:
"""
Create an AuditSearchRequest based on the provided parameters.
Args:
guid: Asset GUID
qualified_name: Asset qualified name
type_name: Asset type name
size: Number of results to return
sort_item: Sort configuration
Returns:
Configured AuditSearchRequest
"""
if guid:
dsl = DSL(
query=Bool(filter=[Term(field="entityId", value=guid)]),
sort=[sort_item],
size=size,
)
logger.debug(f"Created audit search request by GUID: {guid}")
else:
dsl = DSL(
query=Bool(
must=[
Term(field="entityQualifiedName", value=qualified_name),
Term(field="typeName", value=type_name),
]
),
sort=[sort_item],
size=size,
)
logger.debug(
f"Created audit search request by qualified name: {qualified_name}"
)

return AuditSearchRequest(dsl=dsl)


def extract_basic_audit_info(result) -> Dict[str, Any]:
"""
Extract basic audit information from a result object.
Args:
result: Audit result object
Returns:
Dictionary with basic audit information
"""
return {
"entityQualifiedName": getattr(result, "entity_qualified_name", None),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to send qf, guid, typename? considering we are fetching it for that entity only. It will just use up token

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removing qf and typename

"guid": getattr(result, "entity_id", None),
"typeName": getattr(result, "type_name", None),
"timestamp": getattr(result, "timestamp", None),
"action": getattr(result, "action", None),
"user": getattr(result, "user", None),
"created": getattr(result, "created", None),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets remove created as well

}


def process_audit_result(result) -> Dict[str, Any]:
"""
Process a single audit result into a formatted dictionary.
Args:
result: Audit result object
Returns:
Formatted audit entry dictionary
"""
try:
# Extract basic audit information
audit_entry = extract_basic_audit_info(result)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to the main function only

updates = result.detail.dict(exclude_unset=True)
audit_entry.update(updates)

return audit_entry

except Exception as e:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will this exception be raised?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I checked it is all handled. I will remove this.

logger.warning(f"Error processing audit result: {e}")
return {
"error": f"Failed to process audit entry: {str(e)}",
"entityQualifiedName": "Unknown",
"guid": "Unknown",
}


def create_sort_item(sort_order: str) -> SortItem:
"""
Create a SortItem based on the sort order.
Args:
sort_order: Sort order ("ASC" or "DESC")
Returns:
Configured SortItem
"""
return SortItem(
"created",
order=SortOrder.DESCENDING if sort_order == "DESC" else SortOrder.ASCENDING,
)