diff --git a/doc/placeholders.rst b/doc/placeholders.rst index fa87b1d..0e818e9 100644 --- a/doc/placeholders.rst +++ b/doc/placeholders.rst @@ -5,7 +5,7 @@ Once defined, a workflow is static unless you update it explicitly. But, you can input to workflow executions. You can have dynamic values that you use in the **parameters** fields of the steps in your workflow. For this, the AWS Step Functions Data Science SDK provides a way to define placeholders to pass around when you -create your workflow. There are 2 mechanisms for passing dynamic values in a workflow. +create your workflow. There are 4 mechanisms for passing dynamic values in a workflow. The first mechanism is a global input to the workflow execution. This input is accessible to all the steps in the workflow. The SDK provides :py:meth:`stepfunctions.inputs.ExecutionInput` @@ -82,6 +82,62 @@ that returns the placeholder output for that step. definition = Chain([lambda_state_first, lambda_state_second]) +The third and fourth mechanisms can be used to access the context object from Map states. +The SDK provides the :py:meth:`stepfunctions.inputs.MapItemIndex` class that allows you to get the index number of the +array item that is being processed in the current iteration and the :py:meth:`stepfunctions.inputs.MapItemValue` class +which is used for accessing the value of the array item that is currently being processed. + +.. code-block:: python + + # Create an instance of MapItemValue class, and define a schema. Defining + # a schema is optional, but it is a good practice + map_item_value = MapItemValue(schema={ + 'name': str, + 'points': str + }) + + map_state = Map( + 'MapState', + parameters={ + "Ranking": MapItemIndex(), + "Contestant": map_item_value['name'], + "Score": map_item_value['points'] + } + ) + iterator_state = Pass('TrainIterator') + map_state.attach_iterator(iterator_state) + + # Workflow is created with the placeholders + workflow = Workflow( + name='MyMapWorkflow', + definition=map_state, + role=workflow_execution_role, + ) + + # Create the workflow on AWS Step Functions + workflow.create() + + # The placeholder is assigned a value during execution. The SDK will + # verify that all placeholder values are assigned values, and that + # these values are of the expected type based on the defined schema + # before the execution starts. + workflow_input = execution_input = [{"name": "John", "points": "101"}, {"name": "Snow", "points": "99"}] + workflow.execute(inputs=workflow_input) + + # The execution output will be: + [ + { + "Ranking": 0, + "Contestant": "John", + "Score": "101", + }, + { + "Ranking": 1, + "Contestant": "Snow", + "Score": "99" + } + ] + .. autoclass:: stepfunctions.inputs.Placeholder @@ -90,3 +146,9 @@ that returns the placeholder output for that step. .. autoclass:: stepfunctions.inputs.StepInput :inherited-members: + +.. autoclass:: stepfunctions.inputs.MapItemValue + :inherited-members: + +.. autoclass:: stepfunctions.inputs.MapItemIndex + :inherited-members: diff --git a/src/stepfunctions/inputs/__init__.py b/src/stepfunctions/inputs/__init__.py index ffa01b0..81fe344 100644 --- a/src/stepfunctions/inputs/__init__.py +++ b/src/stepfunctions/inputs/__init__.py @@ -12,4 +12,4 @@ # permissions and limitations under the License. from __future__ import absolute_import -from stepfunctions.inputs.placeholders import Placeholder, ExecutionInput, StepInput +from stepfunctions.inputs.placeholders import Placeholder, ExecutionInput, MapItemIndex, MapItemValue, StepInput diff --git a/src/stepfunctions/inputs/placeholders.py b/src/stepfunctions/inputs/placeholders.py index 3b7f2b6..ed81251 100644 --- a/src/stepfunctions/inputs/placeholders.py +++ b/src/stepfunctions/inputs/placeholders.py @@ -279,7 +279,7 @@ class StepInput(Placeholder): def __init__(self, schema=None, **kwargs): super(StepInput, self).__init__(schema, **kwargs) self.json_str_template = '${}' - + def _create_variable(self, name, parent, type=None): """ Creates a placeholder variable for Step Input. @@ -291,3 +291,44 @@ def _create_variable(self, name, parent, type=None): return StepInput(name=name, parent=parent, type=type) else: return StepInput(name=name, parent=parent) + + +class MapItemValue(Placeholder): + """ + Top-level class for map item value placeholders. + """ + + def __init__(self, schema=None, **kwargs): + super(MapItemValue, self).__init__(schema, **kwargs) + self.json_str_template = '$$.Map.Item.Value{}' + + def _create_variable(self, name, parent, type=None): + """ + Creates a placeholder variable for Map Item Value. + A placeholder variable can only be created if the collection is not immutable due to a pre-specified schema. + """ + if self.immutable: + raise ValueError("Placeholder variable does not conform to schema set for the placeholder collection: " + f" {self.schema}") + if type: + return MapItemValue(name=name, parent=parent, type=type) + else: + return MapItemValue(name=name, parent=parent) + + +class MapItemIndex(Placeholder): + """ + Top-level class for map item index placeholders. + """ + + def __init__(self, **kwargs): + if kwargs.get('schema'): + raise AttributeError("MapItemIndex does not support schema object") + super(MapItemIndex, self).__init__(**kwargs) + self.json_str_template = '$$.Map.Item.Index' + + def _create_variable(self, name, parent, type=None): + raise AttributeError("MapItemIndex has no _create_variable object") + + def __getitem__(self, item): + raise AttributeError("MapItemIndex has no __getitem__ object") diff --git a/src/stepfunctions/steps/compute.py b/src/stepfunctions/steps/compute.py index 203ed47..6cf6f85 100644 --- a/src/stepfunctions/steps/compute.py +++ b/src/stepfunctions/steps/compute.py @@ -55,10 +55,10 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs): heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ if wait_for_callback: @@ -96,10 +96,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ if wait_for_completion: """ @@ -136,10 +136,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ if wait_for_completion: """ @@ -176,10 +176,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ if wait_for_completion: """ diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index 986217c..cd65bce 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -90,10 +90,10 @@ def __init__(self, state_id, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ """ @@ -120,10 +120,10 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ if wait_for_callback: @@ -160,10 +160,10 @@ def __init__(self, state_id, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ """ @@ -190,10 +190,10 @@ def __init__(self, state_id, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ """ @@ -220,10 +220,10 @@ def __init__(self, state_id, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ """ @@ -249,10 +249,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) """ if wait_for_completion: @@ -288,10 +288,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) """ if wait_for_completion: @@ -327,10 +327,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) """ if wait_for_completion: @@ -366,10 +366,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) """ if wait_for_completion: @@ -405,10 +405,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) """ if wait_for_completion: @@ -444,10 +444,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) """ if wait_for_completion: @@ -483,10 +483,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) """ if wait_for_completion: @@ -522,10 +522,10 @@ def __init__(self, state_id, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ """ @@ -553,10 +553,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) """ if wait_for_completion: @@ -594,10 +594,10 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs): heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ if wait_for_callback: """ @@ -634,10 +634,10 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs): heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ if wait_for_callback: """ @@ -672,10 +672,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) """ if wait_for_completion: @@ -711,10 +711,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) """ if wait_for_completion: @@ -750,10 +750,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) """ if wait_for_completion: @@ -789,10 +789,10 @@ def __init__(self, state_id, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ """ @@ -819,10 +819,10 @@ def __init__(self, state_id, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ """ @@ -849,10 +849,10 @@ def __init__(self, state_id, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ """ @@ -879,10 +879,10 @@ def __init__(self, state_id, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ """ diff --git a/src/stepfunctions/steps/states.py b/src/stepfunctions/steps/states.py index 8396e69..e8de073 100644 --- a/src/stepfunctions/steps/states.py +++ b/src/stepfunctions/steps/states.py @@ -73,11 +73,31 @@ def to_dict(self): k = to_pascalcase(k) if k == to_pascalcase(Field.Parameters.value): result[k] = self._replace_placeholders(v) + elif self._is_placeholder_compatible(k) and isinstance(v, Placeholder): + result[k] = v.to_jsonpath() + else: result[k] = v return result + @staticmethod + def _is_placeholder_compatible(field): + """ + Check if the field is placeholder compatible + Args: + field: Field against which to verify placeholder compatibility + """ + return field in [ + # Common fields + to_pascalcase(Field.InputPath.value), + to_pascalcase(Field.OutputPath.value), + + # Map + to_pascalcase(Field.ItemsPath.value) + ] + + def to_json(self, pretty=False): """Serialize to a JSON formatted string. @@ -169,10 +189,10 @@ def __init__(self, state_id, state_type, output_schema=None, **kwargs): state_type (str): Type of the state. (Allowed values: `'Pass'`, `'Succeed'`, `'Fail'`, `'Wait'`, `'Task'`, `'Choice'`, `'Parallel'`, `'Map'`). output_schema (dict): Expected output schema for the State. This is used to validate placeholder inputs used by the next state in the state machine. (default: None) comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ super(State, self).__init__(**kwargs) self.fields['type'] = state_type @@ -306,11 +326,11 @@ def __init__(self, state_id, **kwargs): Args: state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') result (str, optional): If present, its value is treated as the output of a virtual task, and placed as prescribed by the `result_path` field, if any, to be passed on to the next state. If `result` is not provided, the output is the input. - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ super(Pass, self).__init__(state_id, 'Pass', **kwargs) @@ -337,8 +357,8 @@ def __init__(self, state_id, **kwargs): Args: state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') - output_path (str, optional): Path applied to the state’s output, producing the effective output which serves as the raw input for the next state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output, producing the effective output which serves as the raw input for the next state. (default: '$') """ super(Succeed, self).__init__(state_id, 'Succeed', **kwargs) @@ -391,8 +411,8 @@ def __init__(self, state_id, **kwargs): timestamp (str): Absolute expiry time, specified as an ISO-8601 extended offset date-time format string. timestamp_path (str): Path applied to the state's input to select the timestamp to be used for wait duration. comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') - output_path (str, optional): Path applied to the state’s output, producing the effective output which serves as the raw input for the next state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output, producing the effective output which serves as the raw input for the next state. (default: '$') """ super(Wait, self).__init__(state_id, 'Wait', **kwargs) if len([v for v in (self.seconds, self.timestamp, self.timestamp_path, self.seconds_path) if v is not None]) != 1: @@ -421,8 +441,8 @@ def __init__(self, state_id, **kwargs): Args: state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') - output_path (str, optional): Path applied to the state’s output, producing the effective output which serves as the raw input for the next state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output, producing the effective output which serves as the raw input for the next state. (default: '$') """ super(Choice, self).__init__(state_id, 'Choice', **kwargs) self.choices = [] @@ -496,10 +516,10 @@ def __init__(self, state_id, retry=None, catch=None, **kwargs): retry (Retry or list(Retry), optional): A retrier or list of retriers that define the state's retry policy. See `Error handling in Step Functions `_ for more details. catch (Catch or list(Catch), optional): A catcher or list of catchers that define a fallback state. See `Error handling in Step Functions `_ for more details. comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ super(Parallel, self).__init__(state_id, 'Parallel', **kwargs) self.branches = [] @@ -553,13 +573,13 @@ def __init__(self, state_id, retry=None, catch=None, **kwargs): iterator (State or Chain): State or chain to execute for each of the items in `items_path`. retry (Retry or list(Retry), optional): A retrier or list of retriers that define the state's retry policy. See `Error handling in Step Functions `_ for more details. catch (Catch or list(Catch), optional): A catcher or list of catchers that define a fallback state. See `Error handling in Step Functions `_ for more details. - items_path (str, optional): Path in the input for items to iterate over. (default: '$') + items_path (str or Placeholder, optional): Path in the input for items to iterate over. (default: '$') max_concurrency (int, optional): Maximum number of iterations to have running at any given point in time. (default: 0) comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ super(Map, self).__init__(state_id, 'Map', **kwargs) @@ -616,10 +636,10 @@ def __init__(self, state_id, retry=None, catch=None, **kwargs): heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ super(Task, self).__init__(state_id, 'Task', **kwargs) if self.timeout_seconds is not None and self.timeout_seconds_path is not None: diff --git a/tests/integ/test_state_machine_definition.py b/tests/integ/test_state_machine_definition.py index 4881b75..0bb0aeb 100644 --- a/tests/integ/test_state_machine_definition.py +++ b/tests/integ/test_state_machine_definition.py @@ -18,6 +18,7 @@ from sagemaker.utils import unique_name_from_base from sagemaker.image_uris import retrieve from stepfunctions import steps +from stepfunctions.inputs import ExecutionInput, MapItemIndex, MapItemValue from stepfunctions.workflow import Workflow from stepfunctions.steps.utils import get_aws_partition from tests.integ.utils import state_machine_delete_wait @@ -288,6 +289,80 @@ def test_map_state_machine_creation(sfn_client, sfn_role_arn): workflow_test_suite(sfn_client, workflow, asl_state_machine_definition, map_state_result, state_machine_input) +def test_map_state_machine_creation_with_placeholders(sfn_client, sfn_role_arn): + map_item_value = MapItemValue(schema={ + 'name': str, + 'age': str + }) + + execution_input = ExecutionInput() + + map_state_name = "Map State" + iterated_state_name = "Pass State" + final_state_name = "Final State" + max_concurrency = 0 + map_state_result = "Map Result" + state_machine_input = { + "items_path": [{"name": "John", "age": 21}, {"name": "Snow", "age": 18}] + } + + asl_state_machine_definition = { + "StartAt": map_state_name, + "States": { + map_state_name: { + "ItemsPath": "$$.Execution.Input['items_path']", + "Iterator": { + "StartAt": iterated_state_name, + "States": { + iterated_state_name: { + "Type": "Pass", + "End": True + } + } + }, + "MaxConcurrency": max_concurrency, + "Type": "Map", + "Next": final_state_name, + "Parameters": { + "Age.$": "$$.Map.Item.Value['age']", + "MapIndex.$": "$$.Map.Item.Index", + "Name.$": "$$.Map.Item.Value['name']" + }, + }, + final_state_name: { + "Type": "Pass", + "Result": map_state_result, + "End": True + } + } + } + + map_state = steps.Map( + map_state_name, + items_path=execution_input['items_path'], + iterator=steps.Pass(iterated_state_name), + max_concurrency=max_concurrency, + parameters={ + "MapIndex": MapItemIndex(), + "Name": map_item_value['name'], + "Age": map_item_value['age'] + } + ) + + definition = steps.Chain([ + map_state, + steps.Pass(final_state_name, result=map_state_result) + ]) + + workflow = Workflow( + unique_name_from_base('Test_Map_Workflow_With_Placeholders'), + definition=definition, + role=sfn_role_arn + ) + + workflow_test_suite(sfn_client, workflow, asl_state_machine_definition, map_state_result, state_machine_input) + + def test_choice_state_machine_creation(sfn_client, sfn_role_arn): choice_state_name = "ChoiceState" first_match_name = "FirstMatchState" diff --git a/tests/unit/test_path_placeholders_with_steps.py b/tests/unit/test_path_placeholders_with_steps.py new file mode 100644 index 0000000..6bbed03 --- /dev/null +++ b/tests/unit/test_path_placeholders_with_steps.py @@ -0,0 +1,41 @@ +# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the License. +# A copy of the License is located at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# or in the "license" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing +# permissions and limitations under the License. +from __future__ import absolute_import + +import boto3 +import pytest +from unittest.mock import patch + +from stepfunctions.inputs import ExecutionInput, StepInput +from stepfunctions.steps import Pass, Succeed, Wait, Choice, Parallel, Task, Map +from stepfunctions.steps.states import State + +@pytest.mark.parametrize("state, state_id, extra_args", [ + # States + (State, "State", {'state_type': 'Void'}), + (Pass, "PassState", {}), + (Choice, "ChoiceState", {}), + (Succeed, "SucceedState", {}), + (Parallel, "ParallelState", {}), + (Task, "TaskState", {}), + (Wait, "WaitState", {'seconds': 10}), + (Map, "MapState", {'iterator': Pass('PassState')}) +]) +@patch.object(boto3.session.Session, 'region_name', 'us-east-1') +def test_service_step_creation_with_placeholders(state, state_id, extra_args): + execution_input = ExecutionInput(schema={'input_path': str}) + step_input = StepInput(schema={'output_path': str}) + step = state(state_id, input_path=execution_input['input_path'], output_path=step_input['output_path'], **extra_args) + + assert step.to_dict()['InputPath'] == "$$.Execution.Input['input_path']" + assert step.to_dict()['OutputPath'] == "$['output_path']" diff --git a/tests/unit/test_placeholders.py b/tests/unit/test_placeholders.py index 456a7bf..4f543da 100644 --- a/tests/unit/test_placeholders.py +++ b/tests/unit/test_placeholders.py @@ -15,7 +15,7 @@ import pytest import json -from stepfunctions.inputs import ExecutionInput, StepInput +from stepfunctions.inputs import ExecutionInput, MapItemIndex, MapItemValue, StepInput def test_placeholder_creation_with_subscript_operator(): step_input = StepInput() @@ -178,4 +178,189 @@ def check_immutable(placeholder): for k, v in placeholder.store.items(): return check_immutable(v) else: - return False \ No newline at end of file + return False + + +def test_map_item_value_creation_with_subscript_operator(): + map_item_placeholder = MapItemValue() + map_item_placeholder = map_item_placeholder["A"] + assert map_item_placeholder.name == "A" + assert map_item_placeholder.type is None + + +def test_map_item_index_creation_with_subscript_operator(): + map_item_placeholder = MapItemIndex() + with pytest.raises(AttributeError): + map_item_placeholder["A"] + assert not map_item_placeholder.get_schema_as_dict() + assert not map_item_placeholder.immutable + + +def test_map_item_value_creation_with_type(): + map_item_placeholder = MapItemValue() + map_item_variable = map_item_placeholder["A"]["b"].get("C", float) + assert map_item_variable.name == "C" + assert map_item_variable.type == float + + +def test_map_item_value_creation_with_int_key(): + map_item_placeholder = MapItemValue() + map_item_variable = map_item_placeholder["A"][0] + assert map_item_variable.name == 0 + assert map_item_variable.type is None + + +def test_map_item_value_creation_with_invalid_key(): + map_item_placeholder = MapItemValue() + with pytest.raises(ValueError): + map_item_placeholder["A"][1.3] + with pytest.raises(ValueError): + map_item_placeholder["A"].get(1.2, str) + + +def test_map_item_value_creation_failure_with_type(): + map_item_placeholder = MapItemValue() + map_item_placeholder["A"]["b"].get("C", float) + with pytest.raises(ValueError): + map_item_placeholder["A"]["b"].get("C", int) + + +def test_map_item_value_path(): + map_item_placeholder = MapItemValue() + placeholder_variable = map_item_placeholder["A"]["b"]["C"] + expected_path = ["A", "b", "C"] + assert placeholder_variable._get_path() == expected_path + + +def test_map_item_value_contains(): + map_item_placeholder = MapItemValue() + var_three = map_item_placeholder["Key01"]["Key04"] + + map_item_placeholder_two = StepInput() + var_five = map_item_placeholder_two["Key07"] + + assert map_item_placeholder.contains(var_three) == True + assert map_item_placeholder.contains(var_five) == False + assert map_item_placeholder_two.contains(var_three) == False + assert map_item_placeholder_two.contains(var_five) == True + + +def test_map_item_value_schema_as_dict(): + map_item_placeholder = MapItemValue() + map_item_placeholder["A"]["b"].get("C", float) + map_item_placeholder["Message"] + map_item_placeholder["Key01"]["Key02"] + map_item_placeholder["Key03"] + map_item_placeholder["Key03"]["Key04"] + + expected_schema = { + "A": { + "b": { + "C": float + } + }, + "Message": str, + "Key01": { + "Key02": str + }, + "Key03": { + "Key04": str + } + } + + assert map_item_placeholder.get_schema_as_dict() == expected_schema + + +def test_map_item_value_schema_as_json(): + map_item_placeholder = MapItemValue() + map_item_placeholder["Response"].get("StatusCode", int) + map_item_placeholder["Hello"]["World"] + map_item_placeholder["A"] + map_item_placeholder["Hello"]["World"].get("Test", str) + + expected_schema = { + "Response": { + "StatusCode": "int" + }, + "Hello": { + "World": { + "Test": "str" + } + }, + "A": "str" + } + + assert map_item_placeholder.get_schema_as_json() == json.dumps(expected_schema) + + +def test_map_item_value_is_empty(): + workflow_input = MapItemValue() + placeholder_variable = workflow_input["A"]["B"]["C"] + assert placeholder_variable._is_empty() == True + workflow_input["A"]["B"]["C"]["D"] + assert placeholder_variable._is_empty() == False + + +def test_map_item_value_make_immutable(): + workflow_input = MapItemValue() + workflow_input["A"]["b"].get("C", float) + workflow_input["Message"] + workflow_input["Key01"]["Key02"] + workflow_input["Key03"] + workflow_input["Key03"]["Key04"] + + assert check_immutable(workflow_input) == False + + workflow_input._make_immutable() + assert check_immutable(workflow_input) == True + + +def test_map_item_value_with_schema(): + test_schema = { + "A": { + "B": { + "C": int + } + }, + "Request": { + "Status": str + }, + "Hello": float + } + workflow_input = MapItemValue(schema=test_schema) + assert workflow_input.get_schema_as_dict() == test_schema + assert workflow_input.immutable == True + assert workflow_input['A']['B'].get("C", int) + + with pytest.raises(ValueError): + workflow_input["A"]["B"]["D"] + + with pytest.raises(ValueError): + workflow_input["A"]["B"].get("C", float) + + +def test_map_item_index_with_schema(): + test_schema = { + "A": { + "B": { + "C": int + } + }, + "Request": { + "Status": str + }, + "Hello": float + } + with pytest.raises(AttributeError): + workflow_input = MapItemIndex(schema=test_schema) + + +def test_map_item_value_jsonpath(): + map_item_value = MapItemValue() + map_item_value_variable = map_item_value["A"]["b"].get(0, float) + assert map_item_value_variable.to_jsonpath() == "$$.Map.Item.Value['A']['b'][0]" + + +def test_map_item_index_jsonpath(): + map_item_index = MapItemIndex() + assert map_item_index.to_jsonpath() == "$$.Map.Item.Index" diff --git a/tests/unit/test_placeholders_with_steps.py b/tests/unit/test_placeholders_with_steps.py index 54d1543..6a7ef24 100644 --- a/tests/unit/test_placeholders_with_steps.py +++ b/tests/unit/test_placeholders_with_steps.py @@ -15,7 +15,7 @@ import pytest from stepfunctions.steps import Pass, Succeed, Fail, Wait, Choice, ChoiceRule, Parallel, Map, Task, Retry, Catch, Chain, Graph -from stepfunctions.inputs import ExecutionInput, StepInput +from stepfunctions.inputs import ExecutionInput, MapItemValue, MapItemIndex def test_workflow_input_placeholder(): @@ -176,8 +176,22 @@ def test_step_input_order_validation(): def test_map_state_with_placeholders(): workflow_input = ExecutionInput() + map_item_value = MapItemValue(schema={ + 'name': str, + 'age': str + }) - map_state = Map('MapState01') + map_state = Map( + 'MapState01', + input_path=workflow_input['input_path'], + items_path=workflow_input['items_path'], + output_path=workflow_input['output_path'], + parameters={ + "MapIndex": MapItemIndex(), + "Name": map_item_value['name'], + "Age": map_item_value['age'] + } + ) iterator_state = Pass( 'TrainIterator', parameters={ @@ -194,6 +208,9 @@ def test_map_state_with_placeholders(): "MapState01": { "Type": "Map", "End": True, + "InputPath": "$$.Execution.Input['input_path']", + "ItemsPath": "$$.Execution.Input['items_path']", + "OutputPath": "$$.Execution.Input['output_path']", "Iterator": { "StartAt": "TrainIterator", "States": { @@ -206,6 +223,11 @@ def test_map_state_with_placeholders(): "End": True } } + }, + 'Parameters': { + 'Age.$': "$$.Map.Item.Value['age']", + 'MapIndex.$': '$$.Map.Item.Index', + 'Name.$': "$$.Map.Item.Value['name']" } } } @@ -214,6 +236,7 @@ def test_map_state_with_placeholders(): result = Graph(workflow_definition).to_dict() assert result == expected_repr + def test_parallel_state_with_placeholders(): workflow_input = ExecutionInput()