Skip to content

Commit 8dbe72b

Browse files
ADX-202 Support Copying of Assets via Pipeline DSL (#1040)
* ADX-202 Support Copying of Assets via Pipeline DSL * fix formatting errors * fix: address review feedback on asset handling in codegen - Use dirs_exist_ok=True in copytree for idempotent reruns - Cache resolved assets list to avoid double resolution - Handle os.PathLike alongside str for single-asset values - Add regression test for duplicate asset basename Agent-Logs-Url: https://github.com/Clarifai/clarifai-python/sessions/d76e1973-72c2-47e6-81ef-99bd10a4e6e3 Co-authored-by: nitinbhojwani <9331380+nitinbhojwani@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: nitinbhojwani <9331380+nitinbhojwani@users.noreply.github.com>
1 parent 7d3ea33 commit 8dbe72b

9 files changed

Lines changed: 158 additions & 4 deletions

File tree

clarifai/runners/pipelines/codegen.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import ast
22
import inspect
33
import os
4+
import shutil
45
import subprocess
56
import textwrap
67
from typing import List, Sequence, Set
@@ -37,6 +38,8 @@ def _get_node_source(source_lines: Sequence[str], node: ast.AST) -> str:
3738
}
3839
)
3940

41+
_RESERVED_STEP_ASSET_NAMES = frozenset({'pipeline_step.py'})
42+
4043

4144
def _is_dsl_only_import(node: ast.AST) -> bool:
4245
"""Return True if an import node pulls exclusively from DSL-only modules."""
@@ -149,7 +152,62 @@ def _build_step_script(step_definition) -> str:
149152
)
150153

151154

155+
def _resolve_step_assets(step_definition) -> list:
156+
"""Resolve and validate asset paths for a step.
157+
158+
Returns a list of (normalized_path, asset_name) tuples.
159+
Raises ValueError if any asset path does not exist, basenames are duplicated,
160+
or a basename collides with a generated file.
161+
"""
162+
source_file = inspect.getsourcefile(step_definition.func)
163+
if source_file is None:
164+
raise ValueError(f'Could not determine source file for step {step_definition.id}')
165+
166+
source_dir = os.path.dirname(os.path.abspath(source_file))
167+
resolved = []
168+
seen_names = set()
169+
assets = step_definition.assets
170+
if isinstance(assets, (str, os.PathLike)):
171+
assets = [assets]
172+
173+
for asset in assets:
174+
if not str(asset).strip():
175+
continue
176+
raw_path = str(asset).strip()
177+
asset_path = raw_path if os.path.isabs(raw_path) else os.path.join(source_dir, raw_path)
178+
normalized_asset_path = os.path.normpath(asset_path)
179+
180+
if not os.path.exists(normalized_asset_path):
181+
raise ValueError(f"Step '{step_definition.id}' asset path does not exist: {raw_path}")
182+
183+
asset_name = os.path.basename(normalized_asset_path)
184+
if asset_name in _RESERVED_STEP_ASSET_NAMES:
185+
raise ValueError(
186+
f"Step '{step_definition.id}' asset basename is reserved: {asset_name}"
187+
)
188+
if asset_name in seen_names:
189+
raise ValueError(
190+
f"Step '{step_definition.id}' has duplicate asset basename: {asset_name}"
191+
)
192+
seen_names.add(asset_name)
193+
resolved.append((normalized_asset_path, asset_name))
194+
195+
return resolved
196+
197+
198+
def _copy_step_assets(resolved: list, version_dir: str) -> None:
199+
for normalized_asset_path, asset_name in resolved:
200+
destination = os.path.join(version_dir, asset_name)
201+
if os.path.isdir(normalized_asset_path):
202+
shutil.copytree(normalized_asset_path, destination, dirs_exist_ok=True)
203+
else:
204+
shutil.copy2(normalized_asset_path, destination)
205+
206+
152207
def generate_step_directory(step_definition, output_dir: str, user_id: str, app_id: str) -> str:
208+
# Validate assets early so we get a clear error before doing any file I/O.
209+
resolved_assets = _resolve_step_assets(step_definition)
210+
153211
step_dir = os.path.join(output_dir, step_definition.id)
154212
version_dir = os.path.join(step_dir, '1')
155213
os.makedirs(version_dir, exist_ok=True)
@@ -177,6 +235,8 @@ def generate_step_directory(step_definition, output_dir: str, user_id: str, app_
177235
with open(step_script_path, 'w', encoding='utf-8') as handle:
178236
handle.write(_build_step_script(step_definition))
179237

238+
_copy_step_assets(resolved_assets, version_dir)
239+
180240
_format_file(step_script_path)
181241

182242
return step_dir

clarifai/runners/pipelines/step.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,15 @@ def __init__(
9595
*,
9696
id: Optional[str] = None,
9797
requirements=None,
98+
assets=None,
9899
compute: Optional[ComputeInfo] = None,
99100
python_version: str = '3.12',
100101
secrets: Optional[Dict[str, str]] = None,
101102
):
102103
self.func = func
103104
self.id = id or func.__name__.replace('_', '-')
104105
self.requirements = requirements or []
106+
self.assets = assets or []
105107
self.compute = compute or ComputeInfo()
106108
self.python_version = python_version
107109
self.secrets = secrets or {}
@@ -210,6 +212,7 @@ def step(
210212
*,
211213
id: Optional[str] = None,
212214
requirements=None,
215+
assets=None,
213216
compute: Optional[ComputeInfo] = None,
214217
python_version: str = '3.12',
215218
secrets: Optional[Dict[str, str]] = None,
@@ -219,6 +222,7 @@ def decorator(func: Callable[..., Any]) -> StepDefinition:
219222
func,
220223
id=id,
221224
requirements=requirements,
225+
assets=assets,
222226
compute=compute,
223227
python_version=python_version,
224228
secrets=secrets,

examples/pipeline_dsl_text_pipeline.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,29 @@
3333

3434
def normalize_text(value: str) -> str:
3535
"""Small helper intentionally kept outside the step for codegen extraction."""
36-
return " ".join(value.strip().split())
36+
import importlib.util
37+
from pathlib import Path
38+
39+
module_path = Path(__file__).with_name("text_utils.py")
40+
spec = importlib.util.spec_from_file_location("text_utils", module_path)
41+
if spec is None or spec.loader is None:
42+
raise RuntimeError(f"Could not load helper module from {module_path}")
43+
44+
module = importlib.util.module_from_spec(spec)
45+
spec.loader.exec_module(module)
46+
return module.clean_text(value)
3747

3848

3949
@step(
4050
id="prepare-text",
4151
requirements=["transformers>=4.0"],
52+
assets=["./text_utils.py"],
4253
compute=ComputeInfo(cpu_limit="500m", cpu_memory="500Mi"),
4354
)
4455
def prepare_text(input_text: str) -> str:
4556
"""Normalize text before downstream processing."""
4657
cleaned = normalize_text(input_text)
47-
return cleaned.lower()
58+
return cleaned
4859

4960

5061
summarize = step_ref.from_url(

examples/text_utils.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
def clean_text(value: str) -> str:
2+
return ' '.join(value.strip().split()).lower()

tests/cli/test_pipeline_dsl_cli.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,5 +65,6 @@ def test_generate_real_example_pipeline_writes_mixed_step_config(tmp_path: Path)
6565
assert config['pipeline']['step_directories'] == ['prepare-text', 'assemble-report']
6666
assert tasks['summarize']['templateRef']['name'].endswith('/versions/summary-v1')
6767
assert tasks['classify-sentiment']['templateRef']['name'].endswith('/versions/sentiment-v3')
68+
assert (output_dir / 'prepare-text' / '1' / 'text_utils.py').exists()
6869
assert not (output_dir / 'summarize').exists()
6970
assert not (output_dir / 'classify-sentiment').exists()
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from clarifai.runners.pipelines import Pipeline, step
2+
3+
4+
@step(id='collision-step', assets=['./pipeline_step.py'])
5+
def collision_step(input_text: str) -> str:
6+
return input_text
7+
8+
9+
with Pipeline(id='asset-pipeline', user_id='me', app_id='my-app') as pipeline:
10+
raw_text = pipeline.input('input_text')
11+
collision_step(input_text=raw_text)

tests/runners/pipeline_step.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
print('asset file')

tests/runners/sample_module.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
def clean_text(value: str) -> str:
2+
return ' '.join(value.strip().split()).lower()

tests/runners/test_pipeline_dsl.py

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,33 @@
44
import pytest
55
import yaml
66

7-
from clarifai.runners.pipelines import ComputeInfo, Pipeline, step, step_ref
7+
from clarifai.runners.pipelines import (
8+
ComputeInfo,
9+
Pipeline,
10+
load_pipeline_from_file,
11+
step,
12+
step_ref,
13+
)
814
from clarifai.runners.utils.pipeline_validation import PipelineConfigValidator
915

1016

1117
def normalize_text(value: str) -> str:
12-
return ' '.join(value.strip().split())
18+
import importlib.util
19+
20+
module_path = Path(__file__).with_name('sample_module.py')
21+
spec = importlib.util.spec_from_file_location('sample_module', module_path)
22+
if spec is None or spec.loader is None:
23+
raise RuntimeError(f'Could not load helper module from {module_path}')
24+
25+
module = importlib.util.module_from_spec(spec)
26+
spec.loader.exec_module(module)
27+
return module.clean_text(value)
1328

1429

1530
@step(
1631
id='prepare-text',
1732
requirements=['transformers>=4.0'],
33+
assets=['../assets/sample.txt', '../assets/sample_texts', './sample_module.py'],
1834
compute=ComputeInfo(cpu_limit='500m', cpu_memory='500Mi'),
1935
)
2036
def prepare_text(input_text: str) -> str:
@@ -104,19 +120,65 @@ def test_pipeline_generate_writes_helper_functions_and_expected_files(tmp_path:
104120
assert (tmp_path / 'prepare-text' / 'requirements.txt').exists()
105121
step_script = tmp_path / 'prepare-text' / '1' / 'pipeline_step.py'
106122
assert step_script.exists()
123+
assert (tmp_path / 'prepare-text' / '1' / 'sample.txt').exists()
124+
assert (tmp_path / 'prepare-text' / '1' / 'sample_module.py').exists()
125+
assert (tmp_path / 'prepare-text' / '1' / 'sample_texts' / 'sample1.txt').exists()
107126

108127
step_script_content = step_script.read_text(encoding='utf-8')
109128
requirements_content = (tmp_path / 'prepare-text' / 'requirements.txt').read_text(
110129
encoding='utf-8'
111130
)
112131

113132
assert 'def normalize_text(value: str) -> str:' in step_script_content
133+
assert "with_name('sample_module.py')" in step_script_content
114134
assert '@step' not in step_script_content
115135
assert 'transformers>=4.0' in requirements_content
116136
assert not (tmp_path / 'summarize').exists()
117137
assert not (tmp_path / 'classify-sentiment').exists()
118138

119139

140+
def test_pipeline_generate_raises_for_missing_step_asset(tmp_path: Path):
141+
@step(id='missing-asset', assets=['./does-not-exist.txt'])
142+
def missing_asset_step(input_text: str) -> str:
143+
return input_text
144+
145+
with Pipeline(id='asset-pipeline', user_id='me', app_id='my-app') as pipeline:
146+
raw_text = pipeline.input('input_text')
147+
missing_asset_step(input_text=raw_text)
148+
149+
with pytest.raises(ValueError, match='asset path does not exist'):
150+
pipeline.generate(str(tmp_path))
151+
152+
153+
def test_pipeline_generate_raises_for_duplicate_asset_basename(tmp_path: Path):
154+
dir_a = tmp_path / 'a'
155+
dir_b = tmp_path / 'b'
156+
dir_a.mkdir()
157+
dir_b.mkdir()
158+
(dir_a / 'helper.py').write_text('# helper a')
159+
(dir_b / 'helper.py').write_text('# helper b')
160+
161+
@step(id='dup-asset', assets=[str(dir_a / 'helper.py'), str(dir_b / 'helper.py')])
162+
def dup_asset_step(input_text: str) -> str:
163+
return input_text
164+
165+
with Pipeline(id='dup-pipeline', user_id='me', app_id='my-app') as pipeline:
166+
raw_text = pipeline.input('input_text')
167+
dup_asset_step(input_text=raw_text)
168+
169+
with pytest.raises(ValueError, match='duplicate asset basename'):
170+
pipeline.generate(str(tmp_path / 'generated'))
171+
172+
173+
def test_pipeline_generate_raises_for_reserved_step_asset_name(tmp_path: Path):
174+
pipeline = load_pipeline_from_file(
175+
str(Path(__file__).with_name('invalid_reserved_asset_pipeline.py'))
176+
)
177+
178+
with pytest.raises(ValueError, match='asset basename is reserved'):
179+
pipeline.generate(str(tmp_path / 'generated'))
180+
181+
120182
def test_validator_collects_only_managed_steps_without_versions():
121183
pipeline = build_pipeline()
122184

0 commit comments

Comments
 (0)