Skip to content

Commit 3544617

Browse files
committed
passes dropped tables schemas to filter, adjust athena filter
1 parent d417a34 commit 3544617

File tree

4 files changed

+26
-14
lines changed

4 files changed

+26
-14
lines changed

dlt/common/destination/client.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
C_DLT_LOAD_ID,
4545
TLoaderReplaceStrategy,
4646
TTableFormat,
47+
TTableSchema,
4748
)
4849
from dlt.common.destination.capabilities import DestinationCapabilitiesContext
4950
from dlt.common.destination.exceptions import (
@@ -686,10 +687,11 @@ def should_truncate_table_before_load_on_staging_destination(self, table_name: s
686687
"""
687688
pass
688689

689-
def should_drop_table_on_staging_destination(self, table_name: str) -> bool:
690-
"""Tells if table should be dropped on staging destination (regular dataset) in addition to dropping the table on
690+
def should_drop_table_on_staging_destination(self, dropped_table: TTableSchema) -> bool:
691+
"""Tells if `dropped_table` should be dropped on staging destination (regular dataset) in addition to dropping the table on
691692
final destination. This stays False for all the destinations except Athena, non-iceberg where staging destination
692693
holds actual data which needs to be deleted.
694+
Note that `dropped_table` may not longer be present in schema. It is present only if it got recreated.
693695
"""
694696
return False
695697

dlt/destinations/impl/athena/athena.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
TYPE_CHECKING,
1212
)
1313

14+
from dlt.common.schema.exceptions import TableNotFound
15+
1416
if TYPE_CHECKING:
1517
from mypy_boto3_lakeformation import LakeFormationClient
1618
from mypy_boto3_lakeformation.type_defs import (
@@ -35,6 +37,7 @@
3537
TColumnType,
3638
TSchemaTables,
3739
TSortOrder,
40+
TTableSchema,
3841
)
3942
from dlt.common.destination import DestinationCapabilitiesContext, PreparedTableSchema
4043
from dlt.common.destination.client import FollowupJobRequest, SupportsStagingDestination, LoadJob
@@ -406,10 +409,15 @@ def should_truncate_table_before_load_on_staging_destination(self, table_name: s
406409
return True
407410
return False
408411

409-
def should_drop_table_on_staging_destination(self, table_name: str) -> bool:
412+
def should_drop_table_on_staging_destination(self, dropped_table: TTableSchema) -> bool:
410413
# in Athena we must drop table in glue and then we must drop data in staging if table is not iceberg
411-
table = self.prepare_load_table(table_name)
412-
return not self._is_iceberg_table(table)
414+
try:
415+
existing_table = self.prepare_load_table(dropped_table["name"])
416+
# do not drop data if new iceberg table got created - storage is handled by Athena
417+
return not self._is_iceberg_table(existing_table)
418+
except TableNotFound:
419+
# table got dropped and is not recreated - drop staging destination
420+
return True
413421

414422
def should_load_data_to_staging_dataset_on_staging_destination(self, table_name: str) -> bool:
415423
"""iceberg table data goes into staging on staging destination"""

dlt/load/utils.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ def init_client(
6969
expected_update: TSchemaTables,
7070
truncate_filter: Callable[[str], bool],
7171
load_staging_filter: Callable[[str], bool],
72-
drop_staging_filter: Callable[[str], bool],
72+
drop_staging_filter: Callable[[TTableSchema], bool],
7373
drop_tables: Optional[List[TTableSchema]] = None,
7474
truncate_tables: Optional[List[TTableSchema]] = None,
7575
) -> TSchemaTables:
@@ -113,9 +113,10 @@ def init_client(
113113
)
114114
)
115115

116-
# get tables to drop
116+
# get tables to drop, note that drop_tables are not in schema and come from the package
117+
# state
117118
drop_table_names = (
118-
{table["name"] for table in drop_tables if drop_staging_filter(table["name"])}
119+
{table["name"] for table in drop_tables if drop_staging_filter(table)}
119120
if drop_tables
120121
else set()
121122
)

tests/load/pipeline/test_refresh_modes.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -118,11 +118,12 @@ def test_refresh_drop_sources(
118118
if not in_source:
119119
data = list(data.selected_resources.values())
120120

121-
# First run pipeline so destination so tables are created
121+
# first run pipeline so destination so tables are created
122122
info = pipeline.run(data, refresh="drop_sources", **destination_config.run_kwargs)
123123
assert_load_info(info)
124+
assert table_exists(pipeline, "some_data_3")
124125

125-
# Second run of pipeline with only selected resources
126+
# second run of pipeline with only selected resources
126127
if with_wipe:
127128
pipeline._wipe_working_folder()
128129
pipeline = destination_config.setup_pipeline(pipeline_name, dataset_name=dataset_name)
@@ -144,16 +145,16 @@ def test_refresh_drop_sources(
144145
"some_data_2",
145146
}
146147

147-
# No "name" column should exist as table was dropped and re-created without it
148+
# no "name" column should exist as table was dropped and re-created without it
148149
assert_only_table_columns(pipeline, "some_data_1", ["id"])
149150
data = load_tables_to_dicts(pipeline, "some_data_1")["some_data_1"]
150151
result = sorted([row["id"] for row in data])
151-
# Only rows from second run should exist
152+
# only rows from second run should exist
152153
assert result == [3, 4]
153154

154-
# Confirm resource tables not selected on second run got dropped
155+
# confirm resource tables not selected on second run got dropped
155156
assert not table_exists(pipeline, "some_data_3")
156-
# Loaded state is wiped
157+
# loaded state is wiped
157158
with pipeline.destination_client() as dest_client:
158159
destination_state = load_pipeline_state_from_destination(
159160
pipeline.pipeline_name, dest_client # type: ignore[arg-type]

0 commit comments

Comments
 (0)