Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions modelcontextprotocol/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ def search_assets_tool(
domain_guids=None,
date_range=None,
guids=None,
workflow_run_type=None,
workflow_run_status=None,
include_attributes_extra=None,
):
"""
Advanced asset search using FluentSearch with flexible conditions.
Expand Down Expand Up @@ -60,6 +63,10 @@ def search_assets_tool(
date_range (Dict[str, Dict[str, Any]], optional): Date range filters.
Format: {"attribute_name": {"gte": start_timestamp, "lte": end_timestamp}}
guids (List[str], optional): List of asset GUIDs to filter by.
workflow_run_type (str, optional): Workflow run type to filter by.
workflow_run_status (str, optional): Workflow run status to filter by.
include_attributes_extra (Dict[str, str], optional): Additional attributes to include in results.
Format: {"asset_type": "attribute_name"}

Returns:
List[Asset]: List of assets matching the search criteria
Expand Down Expand Up @@ -154,6 +161,14 @@ def search_assets_tool(
}
)

# get business policy that are pending for approval, here the attribute workflow_run_on_asset_guid is the business policy guid that should be used to fetch the business policy details
assets = search_assets(
"asset_type"="WorkflowRun",
"workflow_run_type"="POLICY",
"workflow_run_status"="PENDING",
include_attributes_extra={"WorkflowRun": "workflow_run_on_asset_guid"}
)

"""
return search_assets(
conditions,
Expand All @@ -173,6 +188,9 @@ def search_assets_tool(
domain_guids,
date_range,
guids,
workflow_run_type,
workflow_run_status,
include_attributes_extra,
)


Expand Down
29 changes: 28 additions & 1 deletion modelcontextprotocol/tools/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pyatlan.model.assets import Asset
from pyatlan.model.fluent_search import CompoundQuery, FluentSearch
from pyatlan.model.fields.atlan_fields import AtlanField
from pyatlan.model.assets.workflow_run import WorkflowRun

# Configure logging
logger = logging.getLogger(__name__)
Expand All @@ -29,6 +30,9 @@ def search_assets(
domain_guids: Optional[List[str]] = None,
date_range: Optional[Dict[str, Dict[str, Any]]] = None,
guids: Optional[List[str]] = None,
workflow_run_type: Optional[str] = None,
workflow_run_status: Optional[str] = None,
include_attributes_extra: Optional[Dict[str, str]] = None,
) -> List[Asset]:
"""
Advanced asset search using FluentSearch with flexible conditions.
Expand Down Expand Up @@ -57,7 +61,10 @@ def search_assets(
date_range (Dict[str, Dict[str, Any]], optional): Date range filters.
Format: {"attribute_name": {"gte": start_timestamp, "lte": end_timestamp}}
guids (List[str], optional): List of GUIDs to filter by.

workflow_run_type (str, optional): Workflow run type to filter by.
workflow_run_status (str, optional): Workflow run status to filter by.
include_attributes_extra (Dict[str, str], optional): Additional attributes to include in results.
Format: {"asset_type": "attribute_name"}

Returns:
List[Asset]: List of assets matching the search criteria
Expand Down Expand Up @@ -107,6 +114,18 @@ def search_assets(
Asset.QUALIFIED_NAME.startswith(connection_qualified_name)
)

# Apply workflow run type filter if provided
if workflow_run_type:
logger.debug(f"Filtering by workflow run type: {workflow_run_type}")
search = search.where(WorkflowRun.WORKFLOW_RUN_TYPE.eq(workflow_run_type))

# Apply workflow run status filter if provided
if workflow_run_status:
logger.debug(f"Filtering by workflow run status: {workflow_run_status}")
search = search.where(
WorkflowRun.WORKFLOW_RUN_STATUS.eq(workflow_run_status)
)

# Apply tags filter if provided
if tags and len(tags) > 0:
logger.debug(
Expand Down Expand Up @@ -342,6 +361,14 @@ def search_assets(

logger.debug(f"Included {included_count} attributes in results")

if include_attributes_extra:
for asset_type, attr_type in include_attributes_extra.items():
if asset_type == "WorkflowRun":
attr_obj = getattr(WorkflowRun, attr_type.upper(), None)
search = search.include_on_results(attr_obj)
else:
logger.warning(f"Unknown attribute for inclusion: {attr}, skipping")

# Set pagination
logger.debug(f"Setting pagination: limit={limit}, offset={offset}")
search = search.page_size(limit)
Expand Down