Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,8 @@ def start(
):
param_ds = d

# We can now set the the CONFIGS value in the flow properly. This will overwrite
# anything that may have been passed in by default and we will use exactly what
# We can now set the CONFIGS value in the flow properly. This will overwrite
# anything that may have been passed in by default, and we will use exactly what
# the original flow had. Note that these are accessed through the parameter name
ctx.obj.flow._flow_state[_FlowState.CONFIGS].clear()
d = ctx.obj.flow._flow_state[_FlowState.CONFIGS]
Expand All @@ -471,8 +471,16 @@ def start(
raise ctx.obj.delayed_config_exception

# Init all values in the flow mutators and then process them
for decorator in ctx.obj.flow._flow_state.get(_FlowState.FLOW_MUTATORS, []):
decorator.external_init()
for mutator in ctx.obj.flow._flow_state.get(_FlowState.FLOW_MUTATORS, []):
mutator.external_init()

# Initialize mutators with top-level options
for mutator in ctx.obj.flow._flow_state.get(_FlowState.FLOW_MUTATORS, []):
mutator_options = {
option: deco_options.get(option.replace("-", "_"), option_info["default"])
for option, option_info in mutator.options.items()
}
mutator.flow_init_options(mutator_options)

new_cls = ctx.obj.flow._process_config_decorators(config_options)
if new_cls:
Expand Down
32 changes: 32 additions & 0 deletions metaflow/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ def add_decorator_options(cmd):

seen = {}
existing_params = set(p.name.lower() for p in cmd.params)

# Add decorator options
for deco in flow_decorators(flow_cls):
for option, kwargs in deco.options.items():
Expand All @@ -290,13 +291,43 @@ def add_decorator_options(cmd):
kwargs["envvar"] = "METAFLOW_FLOW_%s" % option.upper()
seen[option] = deco.name
cmd.params.insert(0, click.Option(("--" + option,), **kwargs))

# Add flow mutator options
for mutator in flow_mutators(flow_cls):
for option, kwargs in mutator.options.items():
mutator_name = mutator.__class__.__name__
if option in seen:
msg = (
"Flow mutator '%s' uses an option '%s' which is also "
"used by '%s'. This is a bug in Metaflow. "
"Please file a ticket on GitHub."
% (mutator_name, option, seen[option])
)
raise MetaflowInternalError(msg)
elif mutator_name.lower() in existing_params:
raise MetaflowInternalError(
"Flow mutator '%s' uses an option '%s' which is a reserved "
"keyword. Please use a different option name."
% (mutator_name, option)
)
else:
kwargs["envvar"] = "METAFLOW_FLOW_%s" % option.upper()
seen[option] = mutator_name
cmd.params.insert(0, click.Option(("--" + option,), **kwargs))

return cmd


def flow_decorators(flow_cls):
return [d for deco_list in flow_cls._flow_decorators.values() for d in deco_list]


def flow_mutators(flow_cls):
from metaflow.flowspec import _FlowState

return flow_cls._flow_state.get(_FlowState.FLOW_MUTATORS, [])


class StepDecorator(Decorator):
"""
Base class for all step decorators.
Expand Down Expand Up @@ -797,6 +828,7 @@ def _init_step_decorators(
pre_mutate=False,
statically_defined=deco.statically_defined,
inserted_by=inserted_by_value,
mutator=deco,
)
# Sanity check to make sure we are applying the decorator to the right
# class
Expand Down
1 change: 1 addition & 0 deletions metaflow/flowspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ def _process_config_decorators(cls, config_options, process_configs=True):
pre_mutate=True,
statically_defined=deco.statically_defined,
inserted_by=inserted_by_value,
mutator=deco,
)
# Sanity check to make sure we are applying the decorator to the right
# class
Expand Down
63 changes: 63 additions & 0 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import os
import sys
import types
import uuid
import datetime

from typing import Dict, List, Union, Tuple as TTuple
from metaflow.exception import MetaflowException
from metaflow.metaflow_config_funcs import from_conf, get_validate_choice_fn

Expand Down Expand Up @@ -615,6 +618,56 @@ def get_pinned_conda_libs(python_version, datastore_type):
return pins


###
# Runner API type mappings
# Extensions can add custom Click parameter types via get_click_to_python_types
###
def get_click_to_python_types():
"""
Returns the mapping from Click parameter types to Python types for Runner API.
Extensions can override this function to add custom type mappings.
"""
from metaflow._vendor.click.types import (
BoolParamType,
Choice,
DateTime,
File,
FloatParamType,
IntParamType,
Path,
StringParamType,
Tuple,
UUIDParameterType,
)
from metaflow.parameters import JSONTypeClass
from metaflow.includefile import FilePathClass
from metaflow.user_configs.config_options import (
LocalFileInput,
MultipleTuple,
ConfigValue,
)

# Define JSON type for type hints
JSON = Union[Dict[str, "JSON"], List["JSON"], str, int, float, bool, None]

return {
StringParamType: str,
IntParamType: int,
FloatParamType: float,
BoolParamType: bool,
UUIDParameterType: uuid.UUID,
Path: str,
DateTime: datetime.datetime,
Tuple: tuple,
Choice: str,
File: str,
JSONTypeClass: JSON,
FilePathClass: str,
LocalFileInput: str,
MultipleTuple: TTuple[str, Union[JSON, ConfigValue]],
}


# Check if there are extensions to Metaflow to load and override everything
try:
from metaflow.extension_support import get_modules
Expand Down Expand Up @@ -650,6 +703,16 @@ def _new_get_pinned_conda_libs(
if any(" " in x for x in o):
raise ValueError("Decospecs cannot contain spaces")
_TOGGLE_DECOSPECS.extend(o)
elif n == "get_click_to_python_types":
# Extension provides additional Click type mappings for Runner API
# Merge extension's types with base types
def _new_get_click_to_python_types(f1=globals()[n], f2=o):
d1 = f1()
d2 = f2()
d1.update(d2)
return d1

globals()[n] = _new_get_click_to_python_types
elif not n.startswith("__") and not isinstance(o, types.ModuleType):
globals()[n] = o
# If DEFAULT_DECOSPECS is set, use that, else extrapolate from extensions
Expand Down
34 changes: 15 additions & 19 deletions metaflow/runner/click_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,26 +57,14 @@
)
from metaflow.user_decorators.user_flow_decorator import FlowMutator

# Import Click type mappings from config (allows extensions to add custom types)
from metaflow.metaflow_config import get_click_to_python_types

click_to_python_types = get_click_to_python_types()

# Define a recursive type alias for JSON
JSON = Union[Dict[str, "JSON"], List["JSON"], str, int, float, bool, None]

click_to_python_types = {
StringParamType: str,
IntParamType: int,
FloatParamType: float,
BoolParamType: bool,
UUIDParameterType: uuid.UUID,
Path: str,
DateTime: datetime.datetime,
Tuple: tuple,
Choice: str,
File: str,
JSONTypeClass: JSON,
FilePathClass: str,
LocalFileInput: str,
MultipleTuple: TTuple[str, Union[JSON, ConfigValue]],
}


def _method_sanity_check(
possible_arg_params: TOrderedDict[str, click.Argument],
Expand Down Expand Up @@ -532,8 +520,16 @@ def _compute_flow_parameters(self):
# We ignore any errors if we don't check the configs in the click API.

# Init all values in the flow mutators and then process them
for decorator in self._flow_cls._flow_state.get(_FlowState.FLOW_MUTATORS, []):
decorator.external_init()
for mutator in self._flow_cls._flow_state.get(_FlowState.FLOW_MUTATORS, []):
mutator.external_init()

# Initialize mutators with top-level options (using defaults for Deployer/Runner)
for mutator in self._flow_cls._flow_state.get(_FlowState.FLOW_MUTATORS, []):
mutator_options = {
option: option_info["default"]
for option, option_info in mutator.options.items()
}
mutator.flow_init_options(mutator_options)

new_cls = self._flow_cls._process_config_decorators(
config_options, process_configs=CLICK_API_PROCESS_CONFIG
Expand Down
6 changes: 6 additions & 0 deletions metaflow/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -2074,6 +2074,12 @@ def __init__(
for deco in flow_decorators(self.task.flow):
self.top_level_options.update(deco.get_top_level_options())

# FlowMutators can also define their own top-level options similar to decorators
from metaflow.flowspec import _FlowState

for mutator in self.task.flow._flow_state.get(_FlowState.FLOW_MUTATORS, []):
self.top_level_options.update(mutator.get_top_level_options())

# We also pass configuration options using the kv.<name> syntax which will cause
# the configuration options to be loaded from the CONFIG file (or local-config-file
# in the case of the local runtime)
Expand Down
35 changes: 35 additions & 0 deletions metaflow/user_decorators/mutable_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@ def __init__(
pre_mutate: bool = False,
statically_defined: bool = False,
inserted_by: Optional[str] = None,
mutator: Optional[
"metaflow.user_decorators.user_flow_decorator.FlowMutator"
] = None,
):
self._flow_cls = flow_spec
self._pre_mutate = pre_mutate
self._statically_defined = statically_defined
self._inserted_by = inserted_by
self._mutator = mutator
if self._inserted_by is None:
# This is an error because MutableSteps should only be created by
# StepMutators or FlowMutators. We need to catch it now because otherwise
Expand Down Expand Up @@ -138,6 +142,35 @@ def parameters(
)
yield var, param

@property
def tl_options(self) -> Dict[str, Any]:
"""
Get the top-level CLI options for this mutator.

Returns a dictionary of option names to values that were passed via the CLI.
This allows mutators to access their own top-level options similar to how
they can access configs and parameters.

Example:
```
class MyMutator(FlowMutator):
options = {
'my-option': {'default': 'value', 'help': 'My option'}
}

def pre_mutate(self, mutable_flow):
# Access the option value
val = mutable_flow.tl_options.get('my-option')
print(f'Option value: {val}')
```

Returns
-------
Dict[str, Any]
Dictionary of option names to values
"""
return self._mutator._option_values if self._mutator else {}

@property
def steps(
self,
Expand Down Expand Up @@ -189,6 +222,7 @@ def add_parameter(
"method and not the `mutate` method" % (name, self._inserted_by)
)
from metaflow.parameters import Parameter
from metaflow.flowspec import _FlowState

if hasattr(self._flow_cls, name) and not overwrite:
raise MetaflowException(
Expand All @@ -203,6 +237,7 @@ def add_parameter(
)
debug.userconf_exec("Mutable flow adding parameter %s to flow" % name)
setattr(self._flow_cls, name, value)
self._flow_cls._flow_state.pop(_FlowState.CACHED_PARAMETERS, None)

def remove_parameter(self, parameter_name: str) -> bool:
"""
Expand Down
1 change: 1 addition & 0 deletions metaflow/user_decorators/mutable_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def __init__(
pre_mutate=pre_mutate,
statically_defined=statically_defined,
inserted_by=inserted_by,
mutator=None, # Step mutators don't have top-level options yet
)
self._flow_cls = flow_spec.__class__
self._my_step = step
Expand Down
25 changes: 24 additions & 1 deletion metaflow/user_decorators/user_flow_decorator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, Optional, Union, TYPE_CHECKING
from typing import Dict, Optional, Union, TYPE_CHECKING, Any

from metaflow.exception import MetaflowException
from metaflow.user_configs.config_parameters import (
Expand Down Expand Up @@ -124,6 +124,11 @@ class FlowMutator(metaclass=FlowMutatorMeta):
modify the steps.
"""

# Top-level options that can be specified on the command line
# Format: {'option-name': {'default': value, 'help': 'help text', ...}}
# These options will be registered as CLI arguments and passed to the mutator
options = {}

def __init__(self, *args, **kwargs):
from ..flowspec import FlowSpecMeta

Expand Down Expand Up @@ -228,6 +233,24 @@ def external_init(self):
if "init" in self.__class__.__dict__:
self.init(*self._args, **self._kwargs)

def flow_init_options(self, options: Dict[str, Any]):
"""
Called to initialize the mutator with top-level CLI options.

Parameters
----------
options : Dict[str, Any]
Dictionary of option names to values from the CLI
"""
self._option_values = options

def get_top_level_options(self):
"""
Return a list of option-value pairs that correspond to top-level
options that should be passed to subprocesses (tasks).
"""
return list(self._option_values.items())

def pre_mutate(
self, mutable_flow: "metaflow.user_decorators.mutable_flow.MutableFlow"
) -> None:
Expand Down
Loading