Skip to content

Commit 382241a

Browse files
authored
refactor: Update AWS Glue Databrew StartJobRun step to use integration pattern input (#176)
1 parent aafdb76 commit 382241a

File tree

2 files changed

+51
-22
lines changed

2 files changed

+51
-22
lines changed

src/stepfunctions/steps/service.py

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -550,10 +550,14 @@ class GlueDataBrewStartJobRunStep(Task):
550550
Creates a Task state that starts a DataBrew job. See `Manage AWS Glue DataBrew Jobs with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-databrew.html>`_ for more details.
551551
"""
552552

553-
def __init__(self, state_id, wait_for_completion=True, **kwargs):
553+
def __init__(self, state_id, integration_pattern=IntegrationPattern.WaitForCompletion, **kwargs):
554554
"""
555555
Args:
556556
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
557+
integration_pattern (stepfunctions.steps.integration_resources.IntegrationPattern, optional): Service integration pattern used to call the integrated service. Supported integration patterns (default: WaitForCompletion):
558+
559+
* WaitForCompletion: Wait for the Databrew job to complete before going to the next state. (See `Run A Job <https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-sync>`_ for more details.)
560+
* CallAndContinue: Call StartJobRun and progress to the next state (See `Request Response <https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-default>`_ for more details.)
557561
comment (str, optional): Human-readable comment or description. (default: None)
558562
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
559563
timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
@@ -563,23 +567,18 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
563567
parameters (dict, optional): The value of this field becomes the effective input for the state.
564568
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
565569
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
566-
wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True)
567570
"""
568-
if wait_for_completion:
569-
"""
570-
Example resource arn: arn:aws:states:::databrew:startJobRun.sync
571-
"""
572-
573-
kwargs[Field.Resource.value] = get_service_integration_arn(GLUE_DATABREW_SERVICE_NAME,
574-
GlueDataBrewApi.StartJobRun,
575-
IntegrationPattern.WaitForCompletion)
576-
else:
577-
"""
578-
Example resource arn: arn:aws:states:::databrew:startJobRun
579-
"""
571+
supported_integ_patterns = [IntegrationPattern.WaitForCompletion, IntegrationPattern.CallAndContinue]
580572

581-
kwargs[Field.Resource.value] = get_service_integration_arn(GLUE_DATABREW_SERVICE_NAME,
582-
GlueDataBrewApi.StartJobRun)
573+
is_integration_pattern_valid(integration_pattern, supported_integ_patterns)
574+
kwargs[Field.Resource.value] = get_service_integration_arn(GLUE_DATABREW_SERVICE_NAME,
575+
GlueDataBrewApi.StartJobRun,
576+
integration_pattern)
577+
"""
578+
Example resource arns:
579+
- CallAndContinue: arn: arn:aws:states:::databrew:startJobRun
580+
- WaitForCompletion: arn: arn:aws:states:::databrew:startJobRun.sync
581+
"""
583582

584583
super(GlueDataBrewStartJobRunStep, self).__init__(state_id, **kwargs)
585584

tests/unit/test_service_steps.py

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -677,8 +677,8 @@ def test_emr_modify_instance_group_by_name_step_creation():
677677

678678

679679
@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
680-
def test_databrew_start_job_run_step_creation_sync():
681-
step = GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run - Sync', parameters={
680+
def test_databrew_start_job_run_step_creation_default():
681+
step = GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run - default', parameters={
682682
"Name": "MyWorkflowJobRun"
683683
})
684684

@@ -693,10 +693,30 @@ def test_databrew_start_job_run_step_creation_sync():
693693

694694

695695
@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
696-
def test_databrew_start_job_run_step_creation():
697-
step = GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run', wait_for_completion=False, parameters={
698-
"Name": "MyWorkflowJobRun"
699-
})
696+
def test_databrew_start_job_run_step_creation_wait_for_completion():
697+
step = GlueDataBrewStartJobRunStep(
698+
'Start Glue DataBrew Job Run - WaitForCompletion', integration_pattern=IntegrationPattern.WaitForCompletion,
699+
parameters={
700+
"Name": "MyWorkflowJobRun"
701+
})
702+
703+
assert step.to_dict() == {
704+
'Type': 'Task',
705+
'Resource': 'arn:aws:states:::databrew:startJobRun.sync',
706+
'Parameters': {
707+
'Name': 'MyWorkflowJobRun'
708+
},
709+
'End': True
710+
}
711+
712+
713+
@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
714+
def test_databrew_start_job_run_step_creation_call_and_continue():
715+
step = GlueDataBrewStartJobRunStep(
716+
'Start Glue DataBrew Job Run - CallAndContinue',
717+
integration_pattern=IntegrationPattern.CallAndContinue, parameters={
718+
"Name": "MyWorkflowJobRun"
719+
})
700720

701721
assert step.to_dict() == {
702722
'Type': 'Task',
@@ -708,6 +728,16 @@ def test_databrew_start_job_run_step_creation():
708728
}
709729

710730

731+
@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
732+
def test_databrew_start_job_run_step_creation_wait_for_task_token_raises_error():
733+
error_message = re.escape(f"Integration Pattern ({IntegrationPattern.WaitForTaskToken.name}) is not supported for this step - "
734+
f"Please use one of the following: "
735+
f"{[IntegrationPattern.WaitForCompletion.name, IntegrationPattern.CallAndContinue.name]}")
736+
with pytest.raises(ValueError, match=error_message):
737+
GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run - WaitForTaskToken',
738+
integration_pattern=IntegrationPattern.WaitForTaskToken)
739+
740+
711741
@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
712742
def test_eks_create_cluster_step_creation_call_and_continue():
713743
step = EksCreateClusterStep("Create Eks cluster - CallAndContinue", integration_pattern=IntegrationPattern.CallAndContinue,

0 commit comments

Comments
 (0)