Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ def __init__(self, task, dag):
self._glue_schema_name = task.glue_schema_name
self._sort_key = task.sort_key
self._custom_columns = task.custom_columns
self._input_table_columns_to_include = task.input_table_columns_to_include
self._input_table_columns_to_exclude = task.input_table_columns_to_exclude

def _generate_command(self):
command = BatchCreator._generate_command(self)
Expand Down Expand Up @@ -75,7 +77,10 @@ def _generate_command(self):
command.append(f"--sort_key={self._sort_key}")
if self._custom_columns:
command.append(f"--custom_columns={self._custom_columns}")

if self._input_table_columns_to_include:
command.append(f"--input_table_columns_to_include={self._input_table_columns_to_include}")
if self._input_table_columns_to_exclude:
command.append(f"--input_table_columns_to_exclude={self._input_table_columns_to_exclude}")

return command

Expand Down
29 changes: 27 additions & 2 deletions dagger/pipeline/tasks/reverse_etl_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,21 @@ def init_attributes(cls, orig_cls):
validator=str,
required=False,
comment='Optional JSON string for additional custom columns from static values. Example: \'{"custom_project": "ProjectXYZ", "model_name": "ModelABC"}\''
),
Attribute(
attribute_name="input_table_columns_to_include",
parent_fields=["task_parameters"],
validator=str,
required=False,
comment='Optional comma-separated list of columns to include in the job. Example: \'column1,column2,column3\', if not provided, all columns of input table will be included',
),
Attribute(
attribute_name="input_table_columns_to_exclude",
parent_fields=["task_parameters"],
validator=str,
required=False,
comment='Optional comma-separated list of columns to exclude from the job. Example: \'column1,column2,column3\', if not provided, all columns of input table will be included',
)


]
)

Expand Down Expand Up @@ -188,10 +200,15 @@ def __init__(self, name, pipeline_name, pipeline, job_config):
self._glue_schema_name = self.parse_attribute("glue_schema_name")
self._sort_key = self.parse_attribute("sort_key")
self._custom_columns = self.parse_attribute("custom_columns")
self._input_table_columns_to_include = self.parse_attribute("input_table_columns_to_include")
self._input_table_columns_to_exclude = self.parse_attribute("input_table_columns_to_exclude")

if self._hash_column and self._updated_at_column:
raise ValueError(f"ReverseETLTask: {self._name} hash_column and updated_at_column are mutually exclusive")

if self._input_table_columns_to_include and self._input_table_columns_to_exclude:
raise ValueError(f"ReverseETLTask: {self._name} _input_table_columns_to_include and _input_table_columns_to_exclude are mutually exclusive")

if self._hash_column or self._updated_at_column:
if not self._from_time:
raise ValueError(f"ReverseETLTask: {self._name} from_time is required when hash_column or updated_at_column is provided")
Expand Down Expand Up @@ -309,3 +326,11 @@ def sort_key(self):
@property
def custom_columns(self):
return self._custom_columns

@property
def input_table_columns_to_include(self):
return self._input_table_columns_to_include

@property
def input_table_columns_to_exclude(self):
return self._input_table_columns_to_exclude