-
Notifications
You must be signed in to change notification settings - Fork 8
ADX-141: Refactor pipeline/pipelinestep CLI init commands to flag-based inputs #1010
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 3 commits
18790d2
1ac6a12
427e2b0
0941f36
c83e60f
a110b70
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,6 +13,9 @@ | |
| from clarifai.utils.logging import logger | ||
|
|
||
|
|
||
| _DEFAULT_PIPELINE_ID = "hello-world-pipeline" | ||
|
|
||
|
|
||
| @cli.group( | ||
| ['pipeline', 'pl'], | ||
| cls=AliasedGroup, | ||
|
|
@@ -285,14 +288,43 @@ def run( | |
| required=False, | ||
| help='Initialize from a template (e.g., image-classification, text-prep)', | ||
| ) | ||
| def init(pipeline_path, template): | ||
| @click.option('--user_id', required=False, help='User ID for the pipeline.') | ||
| @click.option('--app_id', required=False, help='App ID for the pipeline.') | ||
| @click.option( | ||
| '--pipeline_id', | ||
| required=False, | ||
| default=_DEFAULT_PIPELINE_ID, | ||
| show_default=True, | ||
| help='Pipeline ID.', | ||
| ) | ||
| @click.option( | ||
| '--steps', | ||
| required=False, | ||
| multiple=True, | ||
| help='Pipeline step names. Can be specified multiple times (e.g., --steps stepA --steps stepB). Ignored when --template is used.', | ||
| ) | ||
| @click.option( | ||
| '--num_steps', | ||
| required=False, | ||
| type=int, | ||
| default=2, | ||
| show_default=True, | ||
| help='Number of pipeline steps to create when --steps is not specified. Ignored when --template or --steps is used.', | ||
| ) | ||
| @click.option( | ||
| '--set', | ||
| 'override_params', | ||
| multiple=True, | ||
| help='Template parameter overrides. Format: --set key=value. Can be used multiple times. Only used with --template.', | ||
| ) | ||
| def init(pipeline_path, template, user_id, app_id, pipeline_id, steps, num_steps, override_params): | ||
| """Initialize a new pipeline project structure. | ||
|
|
||
| Creates a pipeline project structure either from a template or interactively. | ||
| Creates a pipeline project structure either from a template or using flag-based inputs. | ||
|
|
||
| When using --template, initializes from a predefined template with specific | ||
| parameters and structure. Without --template, uses the interactive flow | ||
| to create a custom pipeline structure. | ||
| parameters and structure. Without --template, creates a custom pipeline structure | ||
| using the provided flags. | ||
|
|
||
| Creates the following structure in the specified directory: | ||
| ├── config.yaml # Pipeline configuration | ||
|
|
@@ -309,17 +341,57 @@ def init(pipeline_path, template): | |
| └── README.md # Documentation | ||
|
|
||
| PIPELINE_PATH: Path where to create the pipeline project structure. If not specified, the current directory is used by default. | ||
|
|
||
| Examples: | ||
|
|
||
| # Basic initialization with defaults | ||
| clarifai pipeline init | ||
|
|
||
| # Initialize with explicit IDs and steps | ||
| clarifai pipeline init --user_id=my_user --app_id=my_app --pipeline_id=my-pipeline --steps stepA --steps stepB | ||
|
|
||
| # Initialize with a specific number of steps | ||
| clarifai pipeline init --user_id=my_user --app_id=my_app --pipeline_id=my-pipeline --num_steps=3 | ||
|
|
||
| # Initialize from a template | ||
| clarifai pipeline init --template=image-classification --user_id=my_user --app_id=my_app | ||
|
|
||
| # Initialize from a template with custom parameters | ||
| clarifai pipeline init --template=image-classification --user_id=my_user --app_id=my_app --set model_name=resnet50 | ||
| """ | ||
| # Common setup logic | ||
| pipeline_path = _prepare_pipeline_path(pipeline_path, template) | ||
| if not pipeline_path: | ||
| return # Error already shown in _prepare_pipeline_path | ||
|
|
||
| # Resolve step names: explicit --steps take precedence, then generate from --num_steps | ||
| if steps: | ||
| resolved_steps = [*steps] | ||
| else: | ||
| default_names = ["stepA", "stepB", "stepC", "stepD", "stepE", "stepF"] | ||
| resolved_steps = [ | ||
| default_names[i] if i < len(default_names) else f"step{i + 1}" | ||
| for i in range(num_steps) | ||
| ] | ||
|
|
||
| # Branch to specific initialization method | ||
| if template: | ||
| success = _init_from_template(pipeline_path, template) | ||
| success = _init_from_template( | ||
| pipeline_path, | ||
| template, | ||
| user_id=user_id, | ||
| app_id=app_id, | ||
| pipeline_id=pipeline_id, | ||
| override_params=override_params, | ||
| ) | ||
| else: | ||
| success = _init_interactive(pipeline_path) | ||
| success = _init_flag_based( | ||
| pipeline_path, | ||
| user_id=user_id, | ||
| app_id=app_id, | ||
| pipeline_id=pipeline_id, | ||
| step_names=resolved_steps, | ||
| ) | ||
|
|
||
| # Common completion logic | ||
| if success: | ||
|
|
@@ -370,20 +442,25 @@ def _show_completion_message(pipeline_path): | |
| logger.info("3. Run 'clarifai pipeline upload config.yaml' to upload your pipeline") | ||
|
|
||
|
|
||
| def _init_from_template(pipeline_path, template_name): | ||
| def _init_from_template( | ||
| pipeline_path, template_name, user_id=None, app_id=None, pipeline_id=None, override_params=None | ||
| ): | ||
| """Initialize pipeline from a template. | ||
|
|
||
| Args: | ||
| pipeline_path: Destination path for the pipeline (already prepared) | ||
| template_name: Name of the template to use | ||
| user_id: User ID for the pipeline (optional, uses placeholder if not provided) | ||
| app_id: App ID for the pipeline (optional, uses placeholder if not provided) | ||
| pipeline_id: Pipeline ID (optional, defaults to template_name) | ||
| override_params: Iterable of "key=value" strings for template parameter overrides | ||
|
|
||
| Returns: | ||
| bool: True if successful, False otherwise | ||
| """ | ||
| from clarifai.utils.template_manager import TemplateManager | ||
|
|
||
| click.echo("Welcome to Clarifai Pipeline Template Initialization!") | ||
| click.echo(f"Using template: {template_name}") | ||
| click.echo(f"Initializing pipeline from template: {template_name}") | ||
| click.echo() | ||
|
|
||
| try: | ||
|
|
@@ -402,41 +479,46 @@ def _init_from_template(pipeline_path, template_name): | |
|
|
||
| parameters = template_info['parameters'] | ||
| if parameters: | ||
| click.echo(f"Parameters: {len(parameters)} required") | ||
| click.echo(f"Parameters: {len(parameters)} available") | ||
| click.echo() | ||
|
|
||
| # Collect basic pipeline information | ||
| click.echo("Please provide the following information:") | ||
| user_id = click.prompt("User ID", type=str) | ||
| app_id = click.prompt("App ID", type=str) | ||
|
|
||
| # Use template name as default pipeline ID | ||
| default_pipeline_id = template_name | ||
| pipeline_id = click.prompt("Pipeline ID", default=default_pipeline_id, type=str) | ||
| # Apply defaults for unset values | ||
| effective_user_id = user_id or "your_user_id" | ||
nitinbhojwani marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| effective_app_id = app_id or "your_app_id" | ||
| effective_pipeline_id = ( | ||
| pipeline_id if pipeline_id and pipeline_id != _DEFAULT_PIPELINE_ID else template_name | ||
| ) | ||
|
||
|
|
||
| # Collect template-specific parameters | ||
| # Build parameter substitutions from flags | ||
| parameter_substitutions = {} | ||
|
|
||
| # Parse --set overrides | ||
| if override_params: | ||
| for param in override_params: | ||
| if '=' not in param: | ||
| raise ValueError(f"Invalid --set format: '{param}'. Expected key=value.") | ||
| key, value = param.split('=', 1) | ||
| parameter_substitutions[key] = value | ||
|
|
||
| # Warn about template parameters that were not overridden | ||
| if parameters: | ||
| click.echo("\nTemplate Parameters:") | ||
| overridden_keys = set(parameter_substitutions.keys()) | ||
| for param in parameters: | ||
| param_name = param['name'] | ||
| default_value = param['default_value'] | ||
|
|
||
| # Format prompt as "param_name (default: value)" | ||
| prompt_text = f"{param_name} (default: {default_value})" | ||
| value = click.prompt(prompt_text, default=default_value) | ||
|
|
||
| # Map parameter name to user's new value for substitution | ||
| # Only add to substitutions if the value actually changed | ||
| if value != default_value: | ||
| parameter_substitutions[param_name] = value | ||
| if param_name not in overridden_keys: | ||
| default_value = param['default_value'] | ||
| logger.info( | ||
| f"Using default value for template parameter '{param_name}': {default_value}" | ||
| ) | ||
|
|
||
| # Add basic info to parameter substitutions | ||
| parameter_substitutions['user_id'] = user_id | ||
| parameter_substitutions['app_id'] = app_id | ||
| parameter_substitutions['id'] = pipeline_id | ||
| parameter_substitutions['user_id'] = effective_user_id | ||
| parameter_substitutions['app_id'] = effective_app_id | ||
| parameter_substitutions['id'] = effective_pipeline_id | ||
|
|
||
| click.echo(f"\nCreating pipeline '{pipeline_id}' from template '{template_name}'...") | ||
| click.echo( | ||
| f"Creating pipeline '{effective_pipeline_id}' from template '{template_name}'..." | ||
| ) | ||
|
|
||
| # Copy template with substitutions | ||
| success = template_manager.copy_template( | ||
|
|
@@ -454,11 +536,17 @@ def _init_from_template(pipeline_path, template_name): | |
| return False | ||
|
|
||
|
|
||
| def _init_interactive(pipeline_path): | ||
| """Interactive pipeline initialization (original behavior). | ||
| def _init_flag_based( | ||
| pipeline_path, user_id=None, app_id=None, pipeline_id=_DEFAULT_PIPELINE_ID, step_names=None | ||
| ): | ||
| """Flag-based pipeline initialization. | ||
|
|
||
| Args: | ||
| pipeline_path: Destination path for the pipeline (already prepared) | ||
| user_id: User ID for the pipeline (optional, uses placeholder if not provided) | ||
| app_id: App ID for the pipeline (optional, uses placeholder if not provided) | ||
| pipeline_id: Pipeline ID (default: 'hello-world-pipeline') | ||
| step_names: List of pipeline step names (default: ['stepA', 'stepB']) | ||
|
|
||
| Returns: | ||
| bool: True if successful, False otherwise | ||
|
|
@@ -471,34 +559,25 @@ def _init_interactive(pipeline_path): | |
| get_readme_template, | ||
| ) | ||
|
|
||
| try: | ||
| # Prompt for user inputs | ||
| click.echo("Welcome to Clarifai Pipeline Initialization!") | ||
| click.echo("Please provide the following information:") | ||
|
|
||
| user_id = click.prompt("User ID", type=str) | ||
| app_id = click.prompt("App ID", type=str) | ||
| pipeline_id = click.prompt("Pipeline ID", default="hello-world-pipeline", type=str) | ||
| num_steps = click.prompt("Number of pipeline steps", default=2, type=int) | ||
| if step_names is None: | ||
| step_names = ["stepA", "stepB"] | ||
|
|
||
| # Get step names | ||
| step_names = [] | ||
| default_names = ["stepA", "stepB", "stepC", "stepD", "stepE", "stepF"] | ||
|
|
||
| for i in range(num_steps): | ||
| default_name = default_names[i] if i < len(default_names) else f"step{i + 1}" | ||
| step_name = click.prompt(f"Name for step {i + 1}", default=default_name, type=str) | ||
| step_names.append(step_name) | ||
| effective_user_id = user_id or "your_user_id" | ||
| effective_app_id = app_id or "your_app_id" | ||
|
|
||
| click.echo(f"\nCreating pipeline '{pipeline_id}' with steps: {', '.join(step_names)}") | ||
| try: | ||
| click.echo(f"Creating pipeline '{pipeline_id}' with steps: {', '.join(step_names)}") | ||
|
|
||
| # Create pipeline config.yaml | ||
| config_path = os.path.join(pipeline_path, "config.yaml") | ||
| if os.path.exists(config_path): | ||
| logger.warning(f"File {config_path} already exists, skipping...") | ||
| else: | ||
| config_template = get_pipeline_config_template( | ||
| pipeline_id=pipeline_id, user_id=user_id, app_id=app_id, step_names=step_names | ||
| pipeline_id=pipeline_id, | ||
| user_id=effective_user_id, | ||
| app_id=effective_app_id, | ||
| step_names=step_names, | ||
| ) | ||
| with open(config_path, 'w', encoding='utf-8') as f: | ||
| f.write(config_template) | ||
|
|
@@ -529,7 +608,7 @@ def _init_interactive(pipeline_path): | |
| logger.warning(f"File {step_config_path} already exists, skipping...") | ||
| else: | ||
| step_config_template = get_pipeline_step_config_template( | ||
| step_id=step_id, user_id=user_id, app_id=app_id | ||
| step_id=step_id, user_id=effective_user_id, app_id=effective_app_id | ||
| ) | ||
| with open(step_config_path, 'w', encoding='utf-8') as f: | ||
| f.write(step_config_template) | ||
|
|
@@ -558,7 +637,7 @@ def _init_interactive(pipeline_path): | |
| return True | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Interactive initialization error: {e}") | ||
| logger.error(f"Pipeline initialization error: {e}") | ||
| click.echo(f"Error: {e}", err=True) | ||
| return False | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -46,7 +46,14 @@ def upload(pipeline_step_path, skip_dockerfile): | |
| required=False, | ||
| default=".", | ||
| ) | ||
| def init(pipeline_step_path): | ||
| @click.option('--user_id', required=False, help='User ID for the pipeline step.') | ||
| @click.option('--app_id', required=False, help='App ID for the pipeline step.') | ||
| @click.option( | ||
| '--step_id', | ||
| required=False, | ||
| help='Pipeline step ID.', | ||
| ) | ||
| def init(pipeline_step_path, user_id, app_id, step_id): | ||
| """Initialize a new pipeline step directory structure. | ||
|
|
||
| Creates the following structure in the specified directory: | ||
|
|
@@ -56,6 +63,17 @@ def init(pipeline_step_path): | |
| └── config.yaml | ||
|
|
||
| PIPELINE_STEP_PATH: Path where to create the pipeline step directory structure. If not specified, the current directory is used by default. | ||
|
|
||
| Examples: | ||
|
|
||
| # Basic initialization with defaults | ||
| clarifai pipelinestep init | ||
|
|
||
| # Initialize with explicit IDs | ||
| clarifai pipelinestep init --user_id=my_user --app_id=my_app --step_id=my-step | ||
|
|
||
| # Initialize in a specific directory | ||
| clarifai pipelinestep init ./my-step --user_id=my_user --app_id=my_app --step_id=my-step | ||
| """ | ||
| from clarifai.cli.templates.pipeline_step_templates import ( | ||
| get_config_template, | ||
|
|
@@ -98,17 +116,30 @@ def init(pipeline_step_path): | |
| if os.path.exists(config_path): | ||
| logger.warning(f"File {config_path} already exists, skipping...") | ||
| else: | ||
| config_template = get_config_template() | ||
| # Pass explicit values; get_config_template uses placeholder defaults for None | ||
| config_template = get_config_template( | ||
| **{ | ||
| k: v | ||
| for k, v in [('step_id', step_id), ('user_id', user_id), ('app_id', app_id)] | ||
| if v | ||
| } | ||
| ) | ||
| with open(config_path, 'w') as f: | ||
| f.write(config_template) | ||
| logger.info(f"Created {config_path}") | ||
|
Comment on lines
+103
to
129
|
||
|
|
||
| logger.info(f"Pipeline step initialization complete in {pipeline_step_path}") | ||
| logger.info("Next steps:") | ||
| logger.info("1. Search for '# TODO: please fill in' comments in the generated files") | ||
| logger.info("2. Update the pipeline step configuration in config.yaml") | ||
| logger.info("3. Add your pipeline step dependencies to requirements.txt") | ||
| logger.info("4. Implement your pipeline step logic in 1/pipeline_step.py") | ||
| has_todos = not (user_id and app_id and step_id) | ||
| if has_todos: | ||
| logger.info("Next steps:") | ||
| logger.info("1. Search for '# TODO: please fill in' comments in the generated files") | ||
| logger.info("2. Update the pipeline step configuration in config.yaml") | ||
| logger.info("3. Add your pipeline step dependencies to requirements.txt") | ||
| logger.info("4. Implement your pipeline step logic in 1/pipeline_step.py") | ||
| else: | ||
| logger.info("Next steps:") | ||
| logger.info("1. Add your pipeline step dependencies to requirements.txt") | ||
| logger.info("2. Implement your pipeline step logic in 1/pipeline_step.py") | ||
|
|
||
|
|
||
| @pipeline_step.command(['ls']) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
--num_stepscurrently accepts 0 or negative values, which produces an emptyresolved_stepslist and generates a pipeline project with no step directories/configs. Since the generated pipeline config’sstep_directoriesand orchestration spec are meant to reference created step folders, this can lead to a non-functional scaffold. Consider enforcing a minimum of 1 (e.g.,type=click.IntRange(min=1)or a runtime validation that errors whennum_steps < 1).