Skip to content

Commit aa041c6

Browse files
committed
Add decorator: automation_activity
When setup_workflow is called, we will register those activities which are decorated w @automation_activity(). This will register them into Atlas for discoverability in the automation engine
1 parent 76420ae commit aa041c6

File tree

5 files changed

+236
-0
lines changed

5 files changed

+236
-0
lines changed

.env.example

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ ATLAN_WORKFLOW_MAX_TIMEOUT_HOURS=1
1111
ATLAN_MAX_CONCURRENT_ACTIVITIES=5
1212
ATLAN_APP_DASHBOARD_HOST=localhost
1313
ATLAN_APP_DASHBOARD_PORT=3000
14+
AUTOMATION_ENGINE_API_HOST=localhost
15+
AUTOMATION_ENGINE_API_PORT=8080
1416
ATLAN_APP_HTTP_HOST=0.0.0.0
1517
ATLAN_APP_HTTP_PORT=8000
1618
ATLAN_WORKFLOW_METRICS_PORT=8234

application_sdk/application/__init__.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1+
import asyncio
12
from concurrent.futures import ThreadPoolExecutor
23
from typing import Any, Dict, List, Optional, Tuple, Type
34

45
from application_sdk.activities import ActivitiesInterface
56
from application_sdk.clients.base import BaseClient
67
from application_sdk.clients.utils import get_workflow_client
78
from application_sdk.constants import ENABLE_MCP
9+
from application_sdk.decorators import ACTIVITY_SPECS, flush_activity_registrations
810
from application_sdk.events.models import EventRegistration
911
from application_sdk.handlers.base import BaseHandler
1012
from application_sdk.observability.logger_adaptor import get_logger
@@ -152,6 +154,20 @@ async def setup_workflow(
152154
workflow_and_activities_classes=workflow_and_activities_classes
153155
)
154156

157+
# Register activities via HTTP API for automation engine (non-blocking)
158+
# The 5 second delay allows the automation engine's server to come up
159+
async def _register_activities_with_delay():
160+
"""Register activities after a 5 second delay to allow automation
161+
engine server to start."""
162+
await asyncio.sleep(5)
163+
await asyncio.to_thread(
164+
flush_activity_registrations,
165+
app_name=self.application_name,
166+
activity_specs=ACTIVITY_SPECS,
167+
)
168+
169+
asyncio.create_task(_register_activities_with_delay())
170+
155171
async def start_workflow(self, workflow_args, workflow_class) -> Any:
156172
"""
157173
Start a new workflow execution.

application_sdk/constants.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,14 @@
4949
SQL_SERVER_MIN_VERSION = os.getenv("ATLAN_SQL_SERVER_MIN_VERSION")
5050
#: Path to the SQL queries directory
5151
SQL_QUERIES_PATH = os.getenv("ATLAN_SQL_QUERIES_PATH", "app/sql")
52+
#: Host address for the automation engine API server
53+
AUTOMATION_ENGINE_API_HOST = os.getenv("AUTOMATION_ENGINE_API_HOST", "localhost")
54+
#: Port number for the automation engine API server
55+
AUTOMATION_ENGINE_API_PORT = os.getenv("AUTOMATION_ENGINE_API_PORT", "8080")
56+
#: Base URL for automation engine API server (constructed from host and port)
57+
AUTOMATION_ENGINE_API_URL = (
58+
f"http://{AUTOMATION_ENGINE_API_HOST}:{AUTOMATION_ENGINE_API_PORT}"
59+
)
5260

5361
# Output Path Constants
5462
#: Output path format for workflows.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .automation_activity import ACTIVITY_SPECS, flush_activity_registrations
2+
3+
__all__: list[str] = ["ACTIVITY_SPECS", "flush_activity_registrations"]
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
"""
2+
Activity registration decorator for automatic registration with the automation engine.
3+
"""
4+
5+
import inspect
6+
import json
7+
from typing import Any, Callable, Dict, List, Union, get_args, get_origin
8+
9+
from pydantic import BaseModel
10+
import requests
11+
12+
from application_sdk.constants import AUTOMATION_ENGINE_API_URL
13+
from application_sdk.observability.logger_adaptor import get_logger
14+
15+
logger = get_logger(__name__)
16+
17+
18+
# Global registry to collect decorated activities
19+
ACTIVITY_SPECS: List[dict[str, Any]] = []
20+
21+
22+
def _type_to_json_schema(annotation: Any) -> Dict[str, Any]:
23+
"""Convert Python type annotation to JSON Schema using reflection."""
24+
if inspect.isclass(annotation) and issubclass(annotation, BaseModel):
25+
schema = annotation.model_json_schema(mode="serialization")
26+
if "$defs" in schema:
27+
defs = schema.pop("$defs")
28+
schema = {**schema, **defs}
29+
return schema
30+
31+
origin = get_origin(annotation)
32+
if origin is Union:
33+
args = get_args(annotation)
34+
if type(None) in args:
35+
non_none_types = [arg for arg in args if arg is not type(None)]
36+
if non_none_types:
37+
schema = _type_to_json_schema(non_none_types[0])
38+
schema["nullable"] = True
39+
return schema
40+
41+
if origin is dict or annotation is dict:
42+
args = get_args(annotation)
43+
if args:
44+
return {
45+
"type": "object",
46+
"additionalProperties": _type_to_json_schema(args[1]),
47+
}
48+
return {"type": "object"}
49+
50+
if origin is list or annotation is list:
51+
args = get_args(annotation)
52+
if args:
53+
return {"type": "array", "items": _type_to_json_schema(args[0])}
54+
return {"type": "array"}
55+
56+
type_mapping = {
57+
str: {"type": "string"},
58+
int: {"type": "integer"},
59+
float: {"type": "number"},
60+
bool: {"type": "boolean"},
61+
Any: {},
62+
}
63+
64+
if annotation in type_mapping:
65+
return type_mapping[annotation]
66+
67+
return {}
68+
69+
70+
def _generate_input_schema(func: Any) -> Dict[str, Any]:
71+
"""Generate JSON Schema for function inputs using reflection."""
72+
sig = inspect.signature(func)
73+
properties = {}
74+
required: list[str] = []
75+
76+
for param_name, param in sig.parameters.items():
77+
if param_name == "self":
78+
continue
79+
80+
param_schema = (
81+
_type_to_json_schema(param.annotation)
82+
if param.annotation != inspect.Parameter.empty
83+
else {}
84+
)
85+
86+
if param.default != inspect.Parameter.empty:
87+
if isinstance(param.default, (str, int, float, bool, type(None))):
88+
param_schema["default"] = param.default
89+
else:
90+
required.append(param_name)
91+
92+
properties[param_name] = param_schema
93+
94+
schema = {"type": "object", "properties": properties}
95+
if required:
96+
schema["required"] = required
97+
98+
return schema
99+
100+
101+
def _generate_output_schema(func: Any) -> Dict[str, Any]:
102+
"""Generate JSON Schema for function outputs using reflection."""
103+
return_annotation = inspect.signature(func).return_annotation
104+
if return_annotation == inspect.Signature.empty:
105+
return {}
106+
return _type_to_json_schema(return_annotation)
107+
108+
109+
def automation_activity(
110+
name: str,
111+
description: str,
112+
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
113+
"""Decorator to mark an activity for automatic registration."""
114+
115+
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
116+
input_schema: dict[str, Any] = _generate_input_schema(func)
117+
output_schema: dict[str, Any] = _generate_output_schema(func)
118+
119+
logger.info(f"Collected automation activity: {name}")
120+
ACTIVITY_SPECS.append(
121+
{
122+
"name": name,
123+
"description": description,
124+
"func": func,
125+
"input_schema": json.dumps(input_schema) if input_schema else None,
126+
"output_schema": json.dumps(output_schema) if output_schema else None,
127+
}
128+
)
129+
return func
130+
131+
return decorator
132+
133+
134+
def flush_activity_registrations(
135+
app_name: str,
136+
activity_specs: List[dict[str, Any]],
137+
) -> None:
138+
"""Flush all collected registrations by calling the activities create API via HTTP."""
139+
if not activity_specs:
140+
logger.info("No activities to register")
141+
return
142+
143+
if not AUTOMATION_ENGINE_API_URL:
144+
logger.warning(
145+
"Automation engine API URL not configured. Skipping activity registration."
146+
)
147+
return
148+
149+
# Perform health check first
150+
try:
151+
health_check_url: str = f"{AUTOMATION_ENGINE_API_URL}/api/health"
152+
health_response: requests.Response = requests.get(health_check_url, timeout=5.0)
153+
health_response.raise_for_status()
154+
logger.info("Automation engine health check passed")
155+
except Exception as e:
156+
logger.warning(
157+
f"Automation engine health check failed: {e}. "
158+
"Skipping activity registration. "
159+
"Check if the automation engine is deployed and accessible."
160+
)
161+
return
162+
163+
logger.info(
164+
f"Registering {len(ACTIVITY_SPECS)} activities with automation engine"
165+
)
166+
167+
# Generate app qualified name
168+
app_qualified_name: str = f"default/apps/{app_name}"
169+
170+
# Build tools payload without function objects (not JSON serializable)
171+
tools = [
172+
{
173+
"name": item["name"],
174+
"description": item["description"],
175+
"input_schema": item["input_schema"],
176+
"output_schema": item["output_schema"],
177+
}
178+
for item in activity_specs
179+
]
180+
181+
payload = {
182+
"app_qualified_name": app_qualified_name,
183+
"app_name": app_name,
184+
"tools": tools,
185+
}
186+
187+
try:
188+
response: requests.Response = requests.post(
189+
f"{AUTOMATION_ENGINE_API_URL}/api/tools",
190+
json=payload,
191+
timeout=30.0,
192+
)
193+
response.raise_for_status()
194+
result = response.json()
195+
196+
if result.get("status") == "success":
197+
logger.info(
198+
f"Successfully registered {len(tools)} activities with automation engine"
199+
)
200+
else:
201+
logger.warning(
202+
f"Failed to register activities with automation engine: {result.get('message')}"
203+
)
204+
except Exception as e:
205+
raise Exception(
206+
f"Failed to register activities with automation engine: {e}"
207+
) from e

0 commit comments

Comments
 (0)