You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Description:
Set up an automated Airflow pipeline for data transfer from S3 to GCS to BQ.
A major requirement of the solution is to trigger the dag daily, to get daily S3 data uploaded into BQ external source partitioned(hive) tables
Current scenario:
For the first part, we have used the "S3ToGoogleCloudStorageOperator" which imports the files from the source s3 bucket into the destination GCS bucket. This operator ensures to copy only the newly added (incremental) files. In the backend, it returns a list of newly added filenames which is pushed into XCOM and which we then pass onto the "BigQueryInsertJobOperator" by pulling the XCOM.
This ensures that only the data from the incremental files is appended into the BQ tables.
Issue:
This works perfectly fine for most of my S3 buckets, however for some cases I receive this error: ERROR - (_mysql_exceptions.DataError) (1406, "Data too long for column 'value' at row 1"
[SQL: INSERT INTO xcom (key, value, timestamp, execution_date, task_id, dag_id) VALUES (%s, %s, %s, %s, %s, %s)
Which I realize is because of XCOM size limitations. The list of string filenames may be exceeding the limit.
An example of the list returned: ["partner=doubleverify/sfmt=v1/table=blk/seat=607671/dt=2021-01-15/file=ATT_BLK_ImpID-20210115-07.csv.gz/part-00000-10065608-4a8e-45e3-99df-3f1c7765ed3f-c000.snappy.parquet", .....500 more elements ]
Is there any way to override the XCOM size limitation? If not then what changes in the DAG architecture should be made to make the pipeline scalable and ensure that only the newly added files in GCS are identified to be loaded into BQ?
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
Description:
Set up an automated Airflow pipeline for data transfer from S3 to GCS to BQ.
A major requirement of the solution is to trigger the dag daily, to get daily S3 data uploaded into BQ external source partitioned(hive) tables
Current scenario:
For the first part, we have used the "S3ToGoogleCloudStorageOperator" which imports the files from the source s3 bucket into the destination GCS bucket. This operator ensures to copy only the newly added (incremental) files. In the backend, it returns a list of newly added filenames which is pushed into XCOM and which we then pass onto the "BigQueryInsertJobOperator" by pulling the XCOM.
This ensures that only the data from the incremental files is appended into the BQ tables.
Issue:
This works perfectly fine for most of my S3 buckets, however for some cases I receive this error:
ERROR - (_mysql_exceptions.DataError) (1406, "Data too long for column 'value' at row 1"
[SQL: INSERT INTO xcom (
key
, value, timestamp, execution_date, task_id, dag_id) VALUES (%s, %s, %s, %s, %s, %s)Which I realize is because of XCOM size limitations. The list of string filenames may be exceeding the limit.
An example of the list returned: ["partner=doubleverify/sfmt=v1/table=blk/seat=607671/dt=2021-01-15/file=ATT_BLK_ImpID-20210115-07.csv.gz/part-00000-10065608-4a8e-45e3-99df-3f1c7765ed3f-c000.snappy.parquet", .....500 more elements ]
Is there any way to override the XCOM size limitation? If not then what changes in the DAG architecture should be made to make the pipeline scalable and ensure that only the newly added files in GCS are identified to be loaded into BQ?
Beta Was this translation helpful? Give feedback.
All reactions