Skip to content

Commit 6738436

Browse files
committed
Re-add backward compatible _flow_decorators
1 parent 0ea5d80 commit 6738436

File tree

7 files changed

+40
-46
lines changed

7 files changed

+40
-46
lines changed

metaflow/flowspec.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,11 @@ def script_name(self) -> str:
326326
fname = fname[:-1]
327327
return os.path.basename(fname)
328328

329+
@property
330+
def _flow_decorators(self):
331+
# Backward compatible method to access flow decorators
332+
return self._flow_state[FlowStateItems.FLOW_DECORATORS]
333+
329334
@classmethod
330335
def _check_parameters(cls, config_parameters=False):
331336
seen = set()

metaflow/plugins/airflow/airflow.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from metaflow import current
1111
from metaflow.decorators import flow_decorators
1212
from metaflow.exception import MetaflowException
13-
from metaflow.flowspec import FlowStateItems
1413
from metaflow.includefile import FilePathClass
1514
from metaflow.metaflow_config import (
1615
AIRFLOW_KUBERNETES_CONN_ID,
@@ -151,7 +150,7 @@ def save_deployment_token(cls, owner, name, token, flow_datastore):
151150
def _get_schedule(self):
152151
# Using the cron presets provided here :
153152
# https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html?highlight=schedule%20interval#cron-presets
154-
schedule = self.flow._flow_state[FlowStateItems.FLOW_DECORATORS].get("schedule")
153+
schedule = self.flow._flow_decorators.get("schedule")
155154
if not schedule:
156155
return None
157156
schedule = schedule[0]
@@ -634,11 +633,10 @@ def _step_cli(self, node, paths, code_package_url, user_code_retries):
634633
return cmds
635634

636635
def _collect_flow_sensors(self):
637-
flow_decos = self.flow._flow_state[FlowStateItems.FLOW_DECORATORS]
638636
decos_lists = [
639-
flow_decos.get(s.name)
637+
self.flow._flow_decorators.get(s.name)
640638
for s in SUPPORTED_SENSORS
641-
if flow_decos.get(s.name) is not None
639+
if self.flow._flow_decorators.get(s.name) is not None
642640
]
643641
af_tasks = [deco.create_task() for decos in decos_lists for deco in decos]
644642
if len(af_tasks) > 0:
@@ -652,14 +650,15 @@ def _contains_foreach(self):
652650
return False
653651

654652
def compile(self):
655-
flow_decos = self.flow._flow_state[FlowStateItems.FLOW_DECORATORS]
656-
if flow_decos.get("trigger") or flow_decos.get("trigger_on_finish"):
653+
if self.flow._flow_decorators.get("trigger") or self.flow._flow_decorators.get(
654+
"trigger_on_finish"
655+
):
657656
raise AirflowException(
658657
"Deploying flows with @trigger or @trigger_on_finish decorator(s) "
659658
"to Airflow is not supported currently."
660659
)
661660

662-
if flow_decos.get("exit_hook"):
661+
if self.flow._flow_decorators.get("exit_hook"):
663662
raise AirflowException(
664663
"Deploying flows with the @exit_hook decorator "
665664
"to Airflow is not currently supported."

metaflow/plugins/airflow/airflow_cli.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from metaflow import current, decorators
88
from metaflow._vendor import click
99
from metaflow.exception import MetaflowException, MetaflowInternalError
10-
from metaflow.flowspec import FlowStateItems
1110
from metaflow.metaflow_config import FEAT_ALWAYS_UPLOAD_CODE_PACKAGE
1211
from metaflow.package import MetaflowPackage
1312
from metaflow.plugins.aws.step_functions.production_token import (
@@ -418,7 +417,7 @@ def _validate_workflow(flow, graph, flow_datastore, metadata, workflow_timeout):
418417
)
419418
)
420419

421-
schedule = flow._flow_state[FlowStateItems.FLOW_DECORATORS].get("schedule")
420+
schedule = flow._flow_decorators.get("schedule")
422421
if not schedule:
423422
return
424423

metaflow/plugins/argo/argo_workflows.py

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
from metaflow import JSONType, current
1313
from metaflow.decorators import flow_decorators
1414
from metaflow.exception import MetaflowException
15-
from metaflow.flowspec import FlowStateItems
1615
from metaflow.graph import FlowGraph
1716
from metaflow.includefile import FilePathClass
1817
from metaflow.metaflow_config import (
@@ -457,7 +456,7 @@ def _base_kubernetes_annotations(self):
457456
return annotations
458457

459458
def _get_schedule(self):
460-
schedule = self.flow._flow_state[FlowStateItems.FLOW_DECORATORS].get("schedule")
459+
schedule = self.flow._flow_decorators.get("schedule")
461460
if schedule:
462461
# Remove the field "Year" if it exists
463462
schedule = schedule[0]
@@ -483,15 +482,14 @@ def schedule(self):
483482

484483
def trigger_explanation(self):
485484
# Trigger explanation for cron workflows
486-
flow_decos = self.flow._flow_state[FlowStateItems.FLOW_DECORATORS]
487-
if flow_decos.get("schedule"):
485+
if self.flow._flow_decorators.get("schedule"):
488486
return (
489487
"This workflow triggers automatically via the CronWorkflow *%s*."
490488
% self.name
491489
)
492490

493491
# Trigger explanation for @trigger
494-
elif flow_decos.get("trigger"):
492+
elif self.flow._flow_decorators.get("trigger"):
495493
return (
496494
"This workflow triggers automatically when the upstream %s "
497495
"is/are published."
@@ -501,7 +499,7 @@ def trigger_explanation(self):
501499
)
502500

503501
# Trigger explanation for @trigger_on_finish
504-
elif flow_decos.get("trigger_on_finish"):
502+
elif self.flow._flow_decorators.get("trigger_on_finish"):
505503
return (
506504
"This workflow triggers automatically when the upstream %s succeed(s)"
507505
% self.list_to_prose(
@@ -564,10 +562,7 @@ def get_execution(cls, name):
564562

565563
def _process_parameters(self):
566564
parameters = {}
567-
has_schedule = (
568-
self.flow._flow_state[FlowStateItems.FLOW_DECORATORS].get("schedule")
569-
is not None
570-
)
565+
has_schedule = self.flow._flow_decorators.get("schedule") is not None
571566
seen = set()
572567
for var, param in self.flow._get_parameters():
573568
# Throw an exception if the parameter is specified twice.
@@ -653,9 +648,10 @@ def _process_triggers(self):
653648
# Impute triggers for Argo Workflow Template specified through @trigger and
654649
# @trigger_on_finish decorators
655650

656-
flow_decos = self.flow._flow_state[FlowStateItems.FLOW_DECORATORS]
657651
# Disallow usage of @trigger and @trigger_on_finish together for now.
658-
if flow_decos.get("trigger") and flow_decos.get("trigger_on_finish"):
652+
if self.flow._flow_decorators.get("trigger") and self.flow._flow_decorators.get(
653+
"trigger_on_finish"
654+
):
659655
raise ArgoWorkflowsException(
660656
"Argo Workflows doesn't support both *@trigger* and "
661657
"*@trigger_on_finish* decorators concurrently yet. Use one or the "
@@ -665,7 +661,7 @@ def _process_triggers(self):
665661
options = None
666662

667663
# @trigger decorator
668-
if flow_decos.get("trigger"):
664+
if self.flow._flow_decorators.get("trigger"):
669665
# Parameters are not duplicated, and exist in the flow. Additionally,
670666
# convert them to lower case since Metaflow parameters are case
671667
# insensitive.
@@ -678,7 +674,7 @@ def _process_triggers(self):
678674
if not param.IS_CONFIG_PARAMETER
679675
]
680676
)
681-
trigger_deco = flow_decos.get("trigger")[0]
677+
trigger_deco = self.flow._flow_decorators.get("trigger")[0]
682678
trigger_deco.format_deploytime_value()
683679
for event in trigger_deco.triggers:
684680
parameters = {}
@@ -710,17 +706,19 @@ def _process_triggers(self):
710706
parameters[key.lower()] = value
711707
event["parameters"] = parameters
712708
event["type"] = "event"
713-
triggers.extend(flow_decos.get("trigger")[0].triggers)
709+
triggers.extend(self.flow._flow_decorators.get("trigger")[0].triggers)
714710

715711
# Set automatic parameter mapping iff only a single event dependency is
716712
# specified with no explicit parameter mapping.
717713
if len(triggers) == 1 and not triggers[0].get("parameters"):
718714
triggers[0]["parameters"] = dict(zip(params, params))
719-
options = flow_decos.get("trigger")[0].options
715+
options = self.flow._flow_decorators.get("trigger")[0].options
720716

721717
# @trigger_on_finish decorator
722-
if flow_decos.get("trigger_on_finish"):
723-
trigger_on_finish_deco = flow_decos.get("trigger_on_finish")[0]
718+
if self.flow._flow_decorators.get("trigger_on_finish"):
719+
trigger_on_finish_deco = self.flow._flow_decorators.get(
720+
"trigger_on_finish"
721+
)[0]
724722
trigger_on_finish_deco.format_deploytime_value()
725723
for event in trigger_on_finish_deco.triggers:
726724
# Actual filters are deduced here since we don't have access to
@@ -759,7 +757,7 @@ def _process_triggers(self):
759757
"flow": event["flow"],
760758
}
761759
)
762-
options = flow_decos.get("trigger_on_finish")[0].options
760+
options = self.flow._flow_decorators.get("trigger_on_finish")[0].options
763761

764762
for event in triggers:
765763
# Assign a sanitized name since we need this at many places to please
@@ -2837,9 +2835,7 @@ def _lifecycle_hooks(self):
28372835
hooks.append(self._pager_duty_change_template())
28382836
hooks.append(self._incident_io_change_template())
28392837

2840-
exit_hook_decos = self.flow._flow_state[FlowStateItems.FLOW_DECORATORS].get(
2841-
"exit_hook", []
2842-
)
2838+
exit_hook_decos = self.flow._flow_decorators.get("exit_hook", [])
28432839

28442840
for deco in exit_hook_decos:
28452841
hooks.extend(self._lifecycle_hook_from_deco(deco))

metaflow/plugins/aws/step_functions/step_functions.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
from metaflow import R
1010
from metaflow.decorators import flow_decorators
1111
from metaflow.exception import MetaflowException
12-
from metaflow.flowspec import FlowStateItems
1312
from metaflow.metaflow_config import (
1413
EVENTS_SFN_ACCESS_IAM_ROLE,
1514
S3_ENDPOINT_URL,
@@ -303,14 +302,15 @@ def get_execution(cls, state_machine_name, name):
303302
raise StepFunctionsException(repr(e))
304303

305304
def _compile(self):
306-
flow_decos = self.flow._flow_state[FlowStateItems.FLOW_DECORATORS]
307-
if flow_decos.get("trigger") or flow_decos.get("trigger_on_finish"):
305+
if self.flow._flow_decorators.get("trigger") or self.flow._flow_decorators.get(
306+
"trigger_on_finish"
307+
):
308308
raise StepFunctionsException(
309309
"Deploying flows with @trigger or @trigger_on_finish decorator(s) "
310310
"to AWS Step Functions is not supported currently."
311311
)
312312

313-
if flow_decos.get("exit_hook"):
313+
if self.flow._flow_decorators.get("exit_hook"):
314314
raise StepFunctionsException(
315315
"Deploying flows with the @exit_hook decorator "
316316
"to AWS Step Functions is not currently supported."
@@ -484,8 +484,7 @@ def _visit(node, workflow, exit_node=None):
484484
return _visit(self.graph["start"], workflow)
485485

486486
def _cron(self):
487-
flow_decos = self.flow._flow_state[FlowStateItems.FLOW_DECORATORS]
488-
schedule = flow_decos.get("schedule")
487+
schedule = self.flow._flow_decorators.get("schedule")
489488
if schedule:
490489
schedule = schedule[0]
491490
if schedule.timezone is not None:

metaflow/plugins/pypi/conda_decorator.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import tempfile
66

77
from metaflow.decorators import FlowDecorator, StepDecorator
8-
from metaflow.flowspec import FlowStateItems
98
from metaflow.metadata_provider import MetaDatum
109
from metaflow.metaflow_environment import InvalidEnvironmentException
1110
from metaflow.packaging_sys import ContentType
@@ -83,9 +82,8 @@ def step_init(self, flow, graph, step, decos, environment, flow_datastore, logge
8382
self.datastore = flow_datastore
8483

8584
# Support flow-level decorator.
86-
flow_decos = self.flow._flow_state[FlowStateItems.FLOW_DECORATORS]
87-
if "conda_base" in flow_decos:
88-
conda_base = flow_decos["conda_base"][0]
85+
if "conda_base" in self.flow._flow_decorators:
86+
conda_base = self.flow._flow_decorators["conda_base"][0]
8987
super_attributes = conda_base.attributes
9088
self.attributes["packages"] = {
9189
**super_attributes["packages"],

metaflow/plugins/pypi/pypi_decorator.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
from metaflow.decorators import FlowDecorator, StepDecorator
2-
from metaflow.flowspec import FlowStateItems
32
from metaflow.metaflow_environment import InvalidEnvironmentException
43

54

@@ -41,9 +40,8 @@ def step_init(self, flow, graph, step, decos, environment, flow_datastore, logge
4140
self.step = step
4241

4342
# Support flow-level decorator
44-
flow_decos = self.flow._flow_state[FlowStateItems.FLOW_DECORATORS]
45-
if "pypi_base" in flow_decos:
46-
pypi_base = flow_decos["pypi_base"][0]
43+
if "pypi_base" in self.flow._flow_decorators:
44+
pypi_base = self.flow._flow_decorators["pypi_base"][0]
4745
super_attributes = pypi_base.attributes
4846
self._attributes_with_user_values.update(
4947
pypi_base._attributes_with_user_values

0 commit comments

Comments
 (0)