diff --git a/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py b/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py index 5ab3910..8446bcb 100644 --- a/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py +++ b/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py @@ -33,6 +33,8 @@ def __init__(self, task, dag): self._glue_schema_name = task.glue_schema_name self._sort_key = task.sort_key self._custom_columns = task.custom_columns + self._input_table_columns_to_include = task.input_table_columns_to_include + self._input_table_columns_to_exclude = task.input_table_columns_to_exclude def _generate_command(self): command = BatchCreator._generate_command(self) @@ -75,7 +77,10 @@ def _generate_command(self): command.append(f"--sort_key={self._sort_key}") if self._custom_columns: command.append(f"--custom_columns={self._custom_columns}") - + if self._input_table_columns_to_include: + command.append(f"--input_table_columns_to_include={self._input_table_columns_to_include}") + if self._input_table_columns_to_exclude: + command.append(f"--input_table_columns_to_exclude={self._input_table_columns_to_exclude}") return command diff --git a/dagger/pipeline/tasks/reverse_etl_task.py b/dagger/pipeline/tasks/reverse_etl_task.py index c0c6290..6160bfb 100644 --- a/dagger/pipeline/tasks/reverse_etl_task.py +++ b/dagger/pipeline/tasks/reverse_etl_task.py @@ -156,9 +156,21 @@ def init_attributes(cls, orig_cls): validator=str, required=False, comment='Optional JSON string for additional custom columns from static values. Example: \'{"custom_project": "ProjectXYZ", "model_name": "ModelABC"}\'' + ), + Attribute( + attribute_name="input_table_columns_to_include", + parent_fields=["task_parameters"], + validator=str, + required=False, + comment='Optional comma-separated list of columns to include in the job. Example: \'column1,column2,column3\', if not provided, all columns of input table will be included', + ), + Attribute( + attribute_name="input_table_columns_to_exclude", + parent_fields=["task_parameters"], + validator=str, + required=False, + comment='Optional comma-separated list of columns to exclude from the job. Example: \'column1,column2,column3\', if not provided, all columns of input table will be included', ) - - ] ) @@ -188,10 +200,15 @@ def __init__(self, name, pipeline_name, pipeline, job_config): self._glue_schema_name = self.parse_attribute("glue_schema_name") self._sort_key = self.parse_attribute("sort_key") self._custom_columns = self.parse_attribute("custom_columns") + self._input_table_columns_to_include = self.parse_attribute("input_table_columns_to_include") + self._input_table_columns_to_exclude = self.parse_attribute("input_table_columns_to_exclude") if self._hash_column and self._updated_at_column: raise ValueError(f"ReverseETLTask: {self._name} hash_column and updated_at_column are mutually exclusive") + if self._input_table_columns_to_include and self._input_table_columns_to_exclude: + raise ValueError(f"ReverseETLTask: {self._name} _input_table_columns_to_include and _input_table_columns_to_exclude are mutually exclusive") + if self._hash_column or self._updated_at_column: if not self._from_time: raise ValueError(f"ReverseETLTask: {self._name} from_time is required when hash_column or updated_at_column is provided") @@ -309,3 +326,11 @@ def sort_key(self): @property def custom_columns(self): return self._custom_columns + + @property + def input_table_columns_to_include(self): + return self._input_table_columns_to_include + + @property + def input_table_columns_to_exclude(self): + return self._input_table_columns_to_exclude