Skip to content

Commit

Permalink
FIX
Browse files Browse the repository at this point in the history
  • Loading branch information
laksh-krishna-sharma committed Aug 18, 2024
1 parent db2a29a commit ce5bee1
Showing 1 changed file with 26 additions and 22 deletions.
48 changes: 26 additions & 22 deletions airflow/providers/amazon/aws/operators/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -824,28 +824,32 @@ def execute(self, context: Context) -> str | None:
job_flow_id=self._job_flow_id,
log_uri=get_log_uri(emr_client=self._emr_hook.conn, job_flow_id=self._job_flow_id),
)
if self.deferrable and self.wait_for_completion:
self.defer(
trigger=EmrCreateJobFlowTrigger(
job_flow_id=self._job_flow_id,
aws_conn_id=self.aws_conn_id,
waiter_delay=self.waiter_delay,
waiter_max_attempts=self.waiter_max_attempts,
),
method_name="execute_complete",
timeout=timedelta(seconds=self.waiter_max_attempts * self.waiter_delay + 60),
)
if self.wait_for_completion and not self.deferrable:
self._emr_hook.get_waiter("job_flow_waiting").wait(
ClusterId=self._job_flow_id,
WaiterConfig=prune_dict(
{
"Delay": self.waiter_delay,
"MaxAttempts": self.waiter_max_attempts,
}
),
)
return self._job_flow_id
if self.wait_for_completion:
if self.deferrable:
self.defer(
trigger=EmrCreateJobFlowTrigger(
job_flow_id=self._job_flow_id,
aws_conn_id=self.aws_conn_id,
waiter_delay=self.waiter_delay,
waiter_max_attempts=self.waiter_max_attempts,
),
method_name="execute_complete",
# timeout is set to ensure that if a trigger dies, the timeout does not restart
# 60 seconds is added to allow the trigger to exit gracefully (i.e. yield TriggerEvent)
timeout=timedelta(seconds=self.waiter_max_attempts * self.waiter_delay + 60),
)
else:
self._emr_hook.get_waiter("job_flow_waiting").wait(
ClusterId=self._job_flow_id,
WaiterConfig=prune_dict(
{
"Delay": self.waiter_delay,
"MaxAttempts": self.waiter_max_attempts,
}
),
)

return self._job_flow_id

def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> str:
event = validate_execute_complete_event(event)
Expand Down

0 comments on commit ce5bee1

Please sign in to comment.