Skip to content

Commit 48e4356

Browse files
committed
Add decorator: automation_activity
When setup_workflow is called, we will register those activities which are decorated w @automation_activity(). This will register them into Atlas for discoverability in the automation engine
1 parent 76420ae commit 48e4356

File tree

4 files changed

+206
-1
lines changed

4 files changed

+206
-1
lines changed

.env.example

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ ATLAN_WORKFLOW_MAX_TIMEOUT_HOURS=1
1111
ATLAN_MAX_CONCURRENT_ACTIVITIES=5
1212
ATLAN_APP_DASHBOARD_HOST=localhost
1313
ATLAN_APP_DASHBOARD_PORT=3000
14+
AUTOMATION_ENGINE_API_HOST=localhost
15+
AUTOMATION_ENGINE_API_PORT=8080
1416
ATLAN_APP_HTTP_HOST=0.0.0.0
1517
ATLAN_APP_HTTP_PORT=8000
1618
ATLAN_WORKFLOW_METRICS_PORT=8234

application_sdk/application/__init__.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
from application_sdk.activities import ActivitiesInterface
55
from application_sdk.clients.base import BaseClient
66
from application_sdk.clients.utils import get_workflow_client
7-
from application_sdk.constants import ENABLE_MCP
7+
from application_sdk.constants import AUTOMATION_ENGINE_API_URL, ENABLE_MCP
8+
from application_sdk.decorators.automation_activity import flush_registrations
89
from application_sdk.events.models import EventRegistration
910
from application_sdk.handlers.base import BaseHandler
1011
from application_sdk.observability.logger_adaptor import get_logger
@@ -152,6 +153,23 @@ async def setup_workflow(
152153
workflow_and_activities_classes=workflow_and_activities_classes
153154
)
154155

156+
# Register activities via HTTP API if automation engine API URL is configured
157+
if AUTOMATION_ENGINE_API_URL:
158+
app_qualified_name: str = f"default/apps/{self.application_name}"
159+
try:
160+
logger.info("Registering activities via HTTP API for automation engine")
161+
flush_registrations(
162+
api_base_url=AUTOMATION_ENGINE_API_URL,
163+
app_qualified_name=app_qualified_name,
164+
app_name=self.application_name,
165+
)
166+
except Exception as e:
167+
logger.warning(
168+
f"Failed to register activities via HTTP API for automation engine: {e}. "
169+
"This is non-blocking and the application will continue to run. "
170+
"Check if the automation engine is deployed and accessible."
171+
)
172+
155173
async def start_workflow(self, workflow_args, workflow_class) -> Any:
156174
"""
157175
Start a new workflow execution.

application_sdk/constants.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,14 @@
4949
SQL_SERVER_MIN_VERSION = os.getenv("ATLAN_SQL_SERVER_MIN_VERSION")
5050
#: Path to the SQL queries directory
5151
SQL_QUERIES_PATH = os.getenv("ATLAN_SQL_QUERIES_PATH", "app/sql")
52+
#: Host address for the automation engine API server
53+
AUTOMATION_ENGINE_API_HOST = os.getenv("AUTOMATION_ENGINE_API_HOST", "localhost")
54+
#: Port number for the automation engine API server
55+
AUTOMATION_ENGINE_API_PORT = os.getenv("AUTOMATION_ENGINE_API_PORT", "8080")
56+
#: Base URL for automation engine API server (constructed from host and port)
57+
AUTOMATION_ENGINE_API_URL = (
58+
f"http://{AUTOMATION_ENGINE_API_HOST}:{AUTOMATION_ENGINE_API_PORT}"
59+
)
5260

5361
# Output Path Constants
5462
#: Output path format for workflows.
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
"""
2+
Activity registration decorator for automatic registration with the automation engine.
3+
"""
4+
5+
import inspect
6+
import json
7+
from typing import Any, Callable, Dict, List, Union, get_args, get_origin
8+
9+
import requests
10+
11+
from application_sdk.observability.logger_adaptor import get_logger
12+
from pydantic import BaseModel
13+
14+
logger = get_logger(__name__)
15+
16+
17+
# Global registry to collect decorated activities
18+
ACTIVITY_REGISTRY: List[dict[str, Any]] = []
19+
20+
21+
def _type_to_json_schema(annotation: Any) -> Dict[str, Any]:
22+
"""Convert Python type annotation to JSON Schema using reflection."""
23+
if inspect.isclass(annotation) and issubclass(annotation, BaseModel):
24+
schema = annotation.model_json_schema(mode="serialization")
25+
if "$defs" in schema:
26+
defs = schema.pop("$defs")
27+
schema = {**schema, **defs}
28+
return schema
29+
30+
origin = get_origin(annotation)
31+
if origin is Union:
32+
args = get_args(annotation)
33+
if type(None) in args:
34+
non_none_types = [arg for arg in args if arg is not type(None)]
35+
if non_none_types:
36+
schema = _type_to_json_schema(non_none_types[0])
37+
schema["nullable"] = True
38+
return schema
39+
40+
if origin is dict or annotation is dict:
41+
args = get_args(annotation)
42+
if args:
43+
return {
44+
"type": "object",
45+
"additionalProperties": _type_to_json_schema(args[1]),
46+
}
47+
return {"type": "object"}
48+
49+
if origin is list or annotation is list:
50+
args = get_args(annotation)
51+
if args:
52+
return {"type": "array", "items": _type_to_json_schema(args[0])}
53+
return {"type": "array"}
54+
55+
type_mapping = {
56+
str: {"type": "string"},
57+
int: {"type": "integer"},
58+
float: {"type": "number"},
59+
bool: {"type": "boolean"},
60+
Any: {},
61+
}
62+
63+
if annotation in type_mapping:
64+
return type_mapping[annotation]
65+
66+
return {}
67+
68+
69+
def _generate_input_schema(func: Any) -> Dict[str, Any]:
70+
"""Generate JSON Schema for function inputs using reflection."""
71+
sig = inspect.signature(func)
72+
properties = {}
73+
required: list[str] = []
74+
75+
for param_name, param in sig.parameters.items():
76+
if param_name == "self":
77+
continue
78+
79+
param_schema = (
80+
_type_to_json_schema(param.annotation)
81+
if param.annotation != inspect.Parameter.empty
82+
else {}
83+
)
84+
85+
if param.default != inspect.Parameter.empty:
86+
if isinstance(param.default, (str, int, float, bool, type(None))):
87+
param_schema["default"] = param.default
88+
else:
89+
required.append(param_name)
90+
91+
properties[param_name] = param_schema
92+
93+
schema = {"type": "object", "properties": properties}
94+
if required:
95+
schema["required"] = required
96+
97+
return schema
98+
99+
100+
def _generate_output_schema(func: Any) -> Dict[str, Any]:
101+
"""Generate JSON Schema for function outputs using reflection."""
102+
return_annotation = inspect.signature(func).return_annotation
103+
if return_annotation == inspect.Signature.empty:
104+
return {}
105+
return _type_to_json_schema(return_annotation)
106+
107+
108+
def automation_activity(
109+
name: str,
110+
description: str,
111+
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
112+
"""Decorator to mark an activity for automatic registration."""
113+
114+
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
115+
input_schema: dict[str, Any] = _generate_input_schema(func)
116+
output_schema: dict[str, Any] = _generate_output_schema(func)
117+
118+
logger.info(f"Collected automation activity: {name}")
119+
ACTIVITY_REGISTRY.append(
120+
{
121+
"name": name,
122+
"description": description,
123+
"func": func,
124+
"input_schema": json.dumps(input_schema) if input_schema else None,
125+
"output_schema": json.dumps(output_schema) if output_schema else None,
126+
}
127+
)
128+
return func
129+
130+
return decorator
131+
132+
133+
def flush_registrations(
134+
api_base_url: str,
135+
app_qualified_name: str,
136+
app_name: str,
137+
) -> None:
138+
"""Flush all collected registrations by calling the activities create API via HTTP."""
139+
if not ACTIVITY_REGISTRY:
140+
logger.info("No activities to register")
141+
return
142+
143+
logger.info(f"Registering {len(ACTIVITY_REGISTRY)} activities with automation engine")
144+
145+
# Build tools payload without function objects (not JSON serializable)
146+
tools = [
147+
{
148+
"name": item["name"],
149+
"description": item["description"],
150+
"input_schema": item["input_schema"],
151+
"output_schema": item["output_schema"],
152+
}
153+
for item in ACTIVITY_REGISTRY
154+
]
155+
156+
payload = {
157+
"app_qualified_name": app_qualified_name,
158+
"app_name": app_name,
159+
"tools": tools,
160+
}
161+
162+
response: requests.Response = requests.post(
163+
f"{api_base_url}/api/tools",
164+
json=payload,
165+
timeout=30.0,
166+
)
167+
response.raise_for_status()
168+
result = response.json()
169+
170+
if result.get("status") == "success":
171+
logger.info(
172+
f"Successfully registered {len(tools)} activities with automation engine"
173+
)
174+
else:
175+
logger.error(
176+
f"Failed to register activities with automation engine: {result.get('message')}"
177+
)

0 commit comments

Comments
 (0)