is there any SNSPublishOperator support for AWS SNS FIFO #21820
Unanswered
raj-gupta271
asked this question in
Ideas
Replies: 1 comment 3 replies
-
@raj-gupta271 were you able to solve this i am running into same issue. |
Beta Was this translation helpful? Give feedback.
3 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
I had created a DAG with SNSPublishOperator as a task and publishing my custom message to AWS SNS FIFO, but i got an error :
[2022-02-25 12:08:11,163] {taskinstance.py:1482} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-packages/airflow/providers/amazon/aws/operators/sns.py", line 82, in execute
message_attributes=self.message_attributes,
File "/usr/local/lib/python3.7/site-packages/airflow/providers/amazon/aws/hooks/sns.py", line 94, in publish_to_target
return self.get_conn().publish(**publish_kwargs)
File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 388, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 708, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.errorfactory.InvalidParameterException: An error occurred (InvalidParameter) when calling the Publish operation: Invalid parameter: The MessageGroupId parameter is required for FIFO topics
So i came into conclusion that i need to pass MessageGroupId parameter mandatory, but i didn't found any documentation on how to pass this parameter in SNSPublishOperator.
I am referrring to this doc. : https://airflow.apache.org/docs/apache-airflow/1.10.12/_modules/airflow/contrib/operators/sns_publish_operator.html
and here is my Dag Code:
`from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.sns_publish_operator import SnsPublishOperator
from datetime import datetime, timedelta
import boto3
def my_func(**kwargs):
s3 = boto3.resource("s3")
copy_source = {
'Bucket': 'vd-prd-smarttvdata-ops-virginia',
'Key': 'Alarm-Instrumentation/application/src/prd/definition/alarm.yaml'
}
default_args = {
'owner': 'abhijeet.raj',
'start_date': datetime(2022, 2, 9, 0),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=1)
}
dag_name = 'SampleDAGWithSNSPublish'
dag = DAG(dag_name, default_args=default_args, schedule_interval='30 * * * 1-5', max_active_runs=2, catchup=False)
python_task = PythonOperator(
task_id='python_task',
python_callable=my_func,
provide_context=True,
dag=dag
)
send_sms = SnsPublishOperator(
task_id="send_sms",
target_arn="arn:aws:sns:us-east-1:833376745199:acr-data-airflow.fifo",
message="SUCCESS",
message_attributes={
"Action": "Loaded",
"Region": "NA",
"S3BucketName": "vd-prd-smarttvdata-ops-virginia",
"S3Prefix": "Alarm-Instrumentation/application/dest/prd/definition/alarm.yaml",
"execution_time": "{{ ts }}",
"run_id": "{{ run_id }}"
},
aws_conn_id="aws_default",
dag=dag
)
python_task >> send_sms
python_task
send_sms
`
@kaxil @mistercrunch @XD-DENG
Beta Was this translation helpful? Give feedback.
All reactions