Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 141 additions & 57 deletions clarifai/cli/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
AliasedGroup,
convert_timestamp_to_string,
display_co_resources,
resolve_id,
validate_context,
)
from clarifai.utils.logging import logger

_DEFAULT_PIPELINE_ID = "hello-world-pipeline"


@cli.group(
['pipeline', 'pl'],
Expand Down Expand Up @@ -285,14 +288,43 @@ def run(
required=False,
help='Initialize from a template (e.g., image-classification, text-prep)',
)
def init(pipeline_path, template):
@click.option('--user_id', required=False, help='User ID for the pipeline.')
@click.option('--app_id', required=False, help='App ID for the pipeline.')
@click.option(
'--pipeline_id',
required=False,
default=_DEFAULT_PIPELINE_ID,
show_default=True,
help='Pipeline ID.',
)
@click.option(
'--steps',
required=False,
multiple=True,
help='Pipeline step names. Can be specified multiple times (e.g., --steps stepA --steps stepB). Ignored when --template is used.',
)
@click.option(
'--num_steps',
required=False,
type=click.IntRange(min=1),
default=2,
show_default=True,
help='Number of pipeline steps to create when --steps is not specified. Ignored when --template or --steps is used.',
)
@click.option(
'--set',
'override_params',
multiple=True,
help='Template parameter overrides. Format: --set key=value. Can be used multiple times. Only used with --template.',
)
def init(pipeline_path, template, user_id, app_id, pipeline_id, steps, num_steps, override_params):
"""Initialize a new pipeline project structure.

Creates a pipeline project structure either from a template or interactively.
Creates a pipeline project structure either from a template or using flag-based inputs.

When using --template, initializes from a predefined template with specific
parameters and structure. Without --template, uses the interactive flow
to create a custom pipeline structure.
parameters and structure. Without --template, creates a custom pipeline structure
using the provided flags.

Creates the following structure in the specified directory:
├── config.yaml # Pipeline configuration
Expand All @@ -309,17 +341,61 @@ def init(pipeline_path, template):
└── README.md # Documentation

PIPELINE_PATH: Path where to create the pipeline project structure. If not specified, the current directory is used by default.

Examples:

# user_id/app_id auto-detected from global config (~/.clarifai/config.yaml)
clarifai pipeline init

# Initialize with explicit IDs and steps
clarifai pipeline init --user_id=my_user --app_id=my_app --pipeline_id=my-pipeline --steps stepA --steps stepB

# Initialize with a specific number of steps
clarifai pipeline init --user_id=my_user --app_id=my_app --pipeline_id=my-pipeline --num_steps=3

# Initialize from a template
clarifai pipeline init --template=image-classification --user_id=my_user --app_id=my_app

# Initialize from a template with custom parameters
clarifai pipeline init --template=image-classification --user_id=my_user --app_id=my_app --set model_name=resnet50
"""
# Resolve user_id and app_id from flag → global config → prompt
user_id = resolve_id(user_id, 'user_id', 'User ID')
app_id = resolve_id(app_id, 'app_id', 'App ID')

# Common setup logic
pipeline_path = _prepare_pipeline_path(pipeline_path, template)
if not pipeline_path:
return # Error already shown in _prepare_pipeline_path

# Resolve step names: explicit --steps take precedence, then generate from --num_steps
if steps:
resolved_steps = [*steps]
else:
default_names = ["stepA", "stepB", "stepC", "stepD", "stepE", "stepF"]
resolved_steps = [
default_names[i] if i < len(default_names) else f"step{i + 1}"
for i in range(num_steps)
]
Comment on lines 306 to +379
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--num_steps currently accepts 0 or negative values, which produces an empty resolved_steps list and generates a pipeline project with no step directories/configs. Since the generated pipeline config’s step_directories and orchestration spec are meant to reference created step folders, this can lead to a non-functional scaffold. Consider enforcing a minimum of 1 (e.g., type=click.IntRange(min=1) or a runtime validation that errors when num_steps < 1).

Copilot uses AI. Check for mistakes.

# Branch to specific initialization method
if template:
success = _init_from_template(pipeline_path, template)
success = _init_from_template(
pipeline_path,
template,
user_id=user_id,
app_id=app_id,
pipeline_id=pipeline_id,
override_params=override_params,
)
else:
success = _init_interactive(pipeline_path)
success = _init_flag_based(
pipeline_path,
user_id=user_id,
app_id=app_id,
pipeline_id=pipeline_id,
step_names=resolved_steps,
)

# Common completion logic
if success:
Expand Down Expand Up @@ -370,20 +446,25 @@ def _show_completion_message(pipeline_path):
logger.info("3. Run 'clarifai pipeline upload config.yaml' to upload your pipeline")


def _init_from_template(pipeline_path, template_name):
def _init_from_template(
pipeline_path, template_name, user_id=None, app_id=None, pipeline_id=None, override_params=None
):
"""Initialize pipeline from a template.

Args:
pipeline_path: Destination path for the pipeline (already prepared)
template_name: Name of the template to use
user_id: User ID for the pipeline (optional, uses placeholder if not provided)
app_id: App ID for the pipeline (optional, uses placeholder if not provided)
pipeline_id: Pipeline ID (optional, defaults to template_name)
override_params: Iterable of "key=value" strings for template parameter overrides

Returns:
bool: True if successful, False otherwise
"""
from clarifai.utils.template_manager import TemplateManager

click.echo("Welcome to Clarifai Pipeline Template Initialization!")
click.echo(f"Using template: {template_name}")
click.echo(f"Initializing pipeline from template: {template_name}")
click.echo()

try:
Expand All @@ -402,41 +483,46 @@ def _init_from_template(pipeline_path, template_name):

parameters = template_info['parameters']
if parameters:
click.echo(f"Parameters: {len(parameters)} required")
click.echo(f"Parameters: {len(parameters)} available")
click.echo()

# Collect basic pipeline information
click.echo("Please provide the following information:")
user_id = click.prompt("User ID", type=str)
app_id = click.prompt("App ID", type=str)

# Use template name as default pipeline ID
default_pipeline_id = template_name
pipeline_id = click.prompt("Pipeline ID", default=default_pipeline_id, type=str)
# user_id and app_id already resolved by the init command caller
effective_user_id = user_id or "your_user_id"
effective_app_id = app_id or "your_app_id"
effective_pipeline_id = (
pipeline_id if pipeline_id and pipeline_id != _DEFAULT_PIPELINE_ID else template_name
)

# Collect template-specific parameters
# Build parameter substitutions from flags
parameter_substitutions = {}

# Parse --set overrides
if override_params:
for param in override_params:
if '=' not in param:
raise ValueError(f"Invalid --set format: '{param}'. Expected key=value.")
key, value = param.split('=', 1)
parameter_substitutions[key] = value

# Warn about template parameters that were not overridden
if parameters:
click.echo("\nTemplate Parameters:")
overridden_keys = set(parameter_substitutions.keys())
for param in parameters:
param_name = param['name']
default_value = param['default_value']

# Format prompt as "param_name (default: value)"
prompt_text = f"{param_name} (default: {default_value})"
value = click.prompt(prompt_text, default=default_value)

# Map parameter name to user's new value for substitution
# Only add to substitutions if the value actually changed
if value != default_value:
parameter_substitutions[param_name] = value
if param_name not in overridden_keys:
default_value = param['default_value']
logger.info(
f"Using default value for template parameter '{param_name}': {default_value}"
)

# Add basic info to parameter substitutions
parameter_substitutions['user_id'] = user_id
parameter_substitutions['app_id'] = app_id
parameter_substitutions['id'] = pipeline_id
parameter_substitutions['user_id'] = effective_user_id
parameter_substitutions['app_id'] = effective_app_id
parameter_substitutions['id'] = effective_pipeline_id

click.echo(f"\nCreating pipeline '{pipeline_id}' from template '{template_name}'...")
click.echo(
f"Creating pipeline '{effective_pipeline_id}' from template '{template_name}'..."
)

# Copy template with substitutions
success = template_manager.copy_template(
Expand All @@ -454,11 +540,17 @@ def _init_from_template(pipeline_path, template_name):
return False


def _init_interactive(pipeline_path):
"""Interactive pipeline initialization (original behavior).
def _init_flag_based(
pipeline_path, user_id=None, app_id=None, pipeline_id=_DEFAULT_PIPELINE_ID, step_names=None
):
"""Flag-based pipeline initialization.

Args:
pipeline_path: Destination path for the pipeline (already prepared)
user_id: User ID for the pipeline (optional, uses placeholder if not provided)
app_id: App ID for the pipeline (optional, uses placeholder if not provided)
pipeline_id: Pipeline ID (default: 'hello-world-pipeline')
step_names: List of pipeline step names (default: ['stepA', 'stepB'])

Returns:
bool: True if successful, False otherwise
Expand All @@ -471,34 +563,26 @@ def _init_interactive(pipeline_path):
get_readme_template,
)

try:
# Prompt for user inputs
click.echo("Welcome to Clarifai Pipeline Initialization!")
click.echo("Please provide the following information:")

user_id = click.prompt("User ID", type=str)
app_id = click.prompt("App ID", type=str)
pipeline_id = click.prompt("Pipeline ID", default="hello-world-pipeline", type=str)
num_steps = click.prompt("Number of pipeline steps", default=2, type=int)
if step_names is None:
step_names = ["stepA", "stepB"]

# Get step names
step_names = []
default_names = ["stepA", "stepB", "stepC", "stepD", "stepE", "stepF"]

for i in range(num_steps):
default_name = default_names[i] if i < len(default_names) else f"step{i + 1}"
step_name = click.prompt(f"Name for step {i + 1}", default=default_name, type=str)
step_names.append(step_name)
# user_id and app_id already resolved by the init command caller
effective_user_id = user_id or "your_user_id"
effective_app_id = app_id or "your_app_id"

click.echo(f"\nCreating pipeline '{pipeline_id}' with steps: {', '.join(step_names)}")
try:
click.echo(f"Creating pipeline '{pipeline_id}' with steps: {', '.join(step_names)}")

# Create pipeline config.yaml
config_path = os.path.join(pipeline_path, "config.yaml")
if os.path.exists(config_path):
logger.warning(f"File {config_path} already exists, skipping...")
else:
config_template = get_pipeline_config_template(
pipeline_id=pipeline_id, user_id=user_id, app_id=app_id, step_names=step_names
pipeline_id=pipeline_id,
user_id=effective_user_id,
app_id=effective_app_id,
step_names=step_names,
)
with open(config_path, 'w', encoding='utf-8') as f:
f.write(config_template)
Expand Down Expand Up @@ -529,7 +613,7 @@ def _init_interactive(pipeline_path):
logger.warning(f"File {step_config_path} already exists, skipping...")
else:
step_config_template = get_pipeline_step_config_template(
step_id=step_id, user_id=user_id, app_id=app_id
step_id=step_id, user_id=effective_user_id, app_id=effective_app_id
)
with open(step_config_path, 'w', encoding='utf-8') as f:
f.write(step_config_template)
Expand Down Expand Up @@ -558,7 +642,7 @@ def _init_interactive(pipeline_path):
return True

except Exception as e:
logger.error(f"Interactive initialization error: {e}")
logger.error(f"Pipeline initialization error: {e}")
click.echo(f"Error: {e}", err=True)
return False

Expand Down
Loading
Loading