Skip to content

Commit 8bd0b11

Browse files
authored
fixes athena refresh mode (#3313)
* adds filter to exclude dropped tables in staging destination, implements for athena * enables refresh mode tests for athena, fixes tests * fixes staging_allowed_local_path on databricks, bumps databricks connector in lockfile * passes dropped tables schemas to filter, adjust athena filter * allows to disable lake formation
1 parent 3bd5099 commit 8bd0b11

File tree

13 files changed

+237
-217
lines changed

13 files changed

+237
-217
lines changed

dlt/common/destination/client.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
C_DLT_LOAD_ID,
4545
TLoaderReplaceStrategy,
4646
TTableFormat,
47+
TTableSchema,
4748
)
4849
from dlt.common.destination.capabilities import DestinationCapabilitiesContext
4950
from dlt.common.destination.exceptions import (
@@ -686,6 +687,14 @@ def should_truncate_table_before_load_on_staging_destination(self, table_name: s
686687
"""
687688
pass
688689

690+
def should_drop_table_on_staging_destination(self, dropped_table: TTableSchema) -> bool:
691+
"""Tells if `dropped_table` should be dropped on staging destination (regular dataset) in addition to dropping the table on
692+
final destination. This stays False for all the destinations except Athena, non-iceberg where staging destination
693+
holds actual data which needs to be deleted.
694+
Note that `dropped_table` may not longer be present in schema. It is present only if it got recreated.
695+
"""
696+
return False
697+
689698

690699
class SupportsOpenTables(ABC):
691700
"""Provides access to data stored in one of open table formats (iceberg or delta) and intended to

dlt/destinations/impl/athena/athena.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
TYPE_CHECKING,
1212
)
1313

14+
from dlt.common.schema.exceptions import TableNotFound
15+
1416
if TYPE_CHECKING:
1517
from mypy_boto3_lakeformation import LakeFormationClient
1618
from mypy_boto3_lakeformation.type_defs import (
@@ -35,6 +37,7 @@
3537
TColumnType,
3638
TSchemaTables,
3739
TSortOrder,
40+
TTableSchema,
3841
)
3942
from dlt.common.destination import DestinationCapabilitiesContext, PreparedTableSchema
4043
from dlt.common.destination.client import FollowupJobRequest, SupportsStagingDestination, LoadJob
@@ -345,6 +348,7 @@ def update_stored_schema(
345348
if (
346349
self.config.lakeformation_config is not None
347350
and self.config.lakeformation_config.enabled
351+
is not None # both True and False are actionable
348352
):
349353
self.manage_lf_tags()
350354
return applied_update
@@ -406,6 +410,16 @@ def should_truncate_table_before_load_on_staging_destination(self, table_name: s
406410
return True
407411
return False
408412

413+
def should_drop_table_on_staging_destination(self, dropped_table: TTableSchema) -> bool:
414+
# in Athena we must drop table in glue and then we must drop data in staging if table is not iceberg
415+
try:
416+
existing_table = self.prepare_load_table(dropped_table["name"])
417+
# do not drop data if new iceberg table got created - storage is handled by Athena
418+
return not self._is_iceberg_table(existing_table)
419+
except TableNotFound:
420+
# table got dropped and is not recreated - drop staging destination
421+
return True
422+
409423
def should_load_data_to_staging_dataset_on_staging_destination(self, table_name: str) -> bool:
410424
"""iceberg table data goes into staging on staging destination"""
411425
table = self.prepare_load_table(table_name)

dlt/destinations/impl/athena/configuration.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
@configspec
1010
class LakeformationConfig:
11-
enabled: bool = False
11+
enabled: Optional[bool] = None
1212
tags: Optional[Dict[str, str]] = None
1313

1414

dlt/destinations/impl/databricks/databricks.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,15 @@ def run(self) -> None:
7272
# decide if this is a local file or a staged file
7373
is_local_file = not ReferenceFollowupJobRequest.is_reference_job(self._file_path)
7474
if is_local_file:
75-
# conn parameter staging_allowed_local_path must be set to use 'PUT/REMOVE volume_path' SQL statement
76-
self._sql_client.native_connection.thrift_backend.staging_allowed_local_path = (
77-
os.path.dirname(self._file_path)
78-
)
75+
# staging_allowed_local_path should be set when opening the connection but at that
76+
# time we do not know this path so do it now
77+
conn_ = self._sql_client.native_connection
78+
file_dir = os.path.dirname(self._file_path)
79+
if backend := getattr(conn_, "thrift_backend", None):
80+
backend.staging_allowed_local_path = file_dir
81+
else:
82+
# thrift backend discontinued on newer databricks connector clients
83+
conn_.staging_allowed_local_path = file_dir
7984
# local file by uploading to a temporary volume on Databricks
8085
from_clause, file_name, volume_path, volume_file_path = self._handle_local_file_upload(
8186
self._file_path

dlt/destinations/impl/dummy/dummy.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,9 @@ def is_storage_initialized(self) -> bool:
148148
def drop_storage(self) -> None:
149149
pass
150150

151+
def drop_tables(self, *tables: str, delete_schema: bool = True) -> None:
152+
pass
153+
151154
def update_stored_schema(
152155
self,
153156
only_tables: Iterable[str] = None,

dlt/load/load.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,7 @@ def initialize_package(
539539
if isinstance(job_client, WithStagingDataset)
540540
else None
541541
),
542+
lambda table_name: True, # drop all passed tables
542543
drop_tables=dropped_tables,
543544
truncate_tables=truncated_tables,
544545
)
@@ -556,9 +557,11 @@ def initialize_package(
556557
schema,
557558
new_jobs,
558559
expected_update,
559-
job_client.should_truncate_table_before_load_on_staging_destination,
560560
# should_truncate_staging,
561+
job_client.should_truncate_table_before_load_on_staging_destination,
561562
job_client.should_load_data_to_staging_dataset_on_staging_destination,
563+
# should we drop tables also on staging destination
564+
job_client.should_drop_table_on_staging_destination,
562565
drop_tables=dropped_tables,
563566
truncate_tables=truncated_tables,
564567
)

dlt/load/utils.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ def init_client(
6969
expected_update: TSchemaTables,
7070
truncate_filter: Callable[[str], bool],
7171
load_staging_filter: Callable[[str], bool],
72+
drop_staging_filter: Callable[[TTableSchema], bool],
7273
drop_tables: Optional[List[TTableSchema]] = None,
7374
truncate_tables: Optional[List[TTableSchema]] = None,
7475
) -> TSchemaTables:
@@ -81,8 +82,9 @@ def init_client(
8182
schema (Schema): The schema as in load package
8283
new_jobs (Iterable[LoadJobInfo]): List of new jobs
8384
expected_update (TSchemaTables): Schema update as in load package. Always present even if empty
84-
truncate_filter (Callable[[str], bool]): A filter that tells which table in destination dataset should be truncated
85-
load_staging_filter (Callable[[str], bool]): A filter which tell which table in the staging dataset may be loaded into
85+
truncate_filter (Callable[[str], bool]): A filter that tells if table should be truncated
86+
load_staging_filter (Callable[[str], bool]): A filter which tell if table may be loaded into
87+
drop_staging_filter (Callable[[str], bool]): A filter which tell if table may be dropped
8688
drop_tables (Optional[List[TTableSchema]]): List of tables to drop before initializing storage
8789
truncate_tables (Optional[List[TTableSchema]]): List of tables to truncate before initializing storage
8890
@@ -111,8 +113,13 @@ def init_client(
111113
)
112114
)
113115

114-
# get tables to drop
115-
drop_table_names = {table["name"] for table in drop_tables} if drop_tables else set()
116+
# get tables to drop, note that drop_tables are not in schema and come from the package
117+
# state
118+
drop_table_names = (
119+
{table["name"] for table in drop_tables if drop_staging_filter(table)}
120+
if drop_tables
121+
else set()
122+
)
116123
job_client.verify_schema(only_tables=tables_with_jobs | dlt_tables, new_jobs=new_jobs)
117124
applied_update = _init_dataset_and_update_schema(
118125
job_client,

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ qdrant = [
142142
"qdrant-client[fastembed]>=1.8"
143143
]
144144
databricks = [
145-
"databricks-sql-connector>=2.9.3,<4 ; python_version <= '3.12'",
145+
"databricks-sql-connector>=2.9.3 ; python_version <= '3.12'",
146146
"databricks-sql-connector>=3.6.0 ; python_version >= '3.13'",
147147
"databricks-sdk>=0.38.0",
148148
]

tests/load/athena_iceberg/test_lakeformation.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,9 @@ def create_pipelines(
186186
)
187187
lf_disabled_pipeline = destination_config.setup_pipeline(
188188
pipeline_name,
189-
destination=destination_config.destination_factory(),
189+
destination=destination_config.destination_factory(
190+
lakeformation_config=LakeformationConfig(enabled=False)
191+
),
190192
dataset_name=dataset_name,
191193
staging=staging_destination,
192194
)

tests/load/pipeline/test_athena.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,15 @@
3535
),
3636
ids=lambda x: x.name,
3737
)
38-
@pytest.mark.parametrize("lf_enabled", [True, False], ids=["lf-on", "lf-off"])
38+
@pytest.mark.parametrize(
39+
"lf_enabled", [True, False, None], ids=["lf-on", "lf-off", "lf-passthrough"]
40+
)
3941
def test_athena_lakeformation_config_gating(
4042
destination_config: DestinationTestConfiguration, lf_enabled: bool, mocker, monkeypatch
4143
) -> None:
4244
# Configure Lake Formation gating via env (read by client config)
43-
monkeypatch.setenv("DESTINATION__LAKEFORMATION_CONFIG__ENABLED", str(lf_enabled))
45+
if lf_enabled is not None:
46+
monkeypatch.setenv("DESTINATION__LAKEFORMATION_CONFIG__ENABLED", str(lf_enabled))
4447

4548
pipeline = destination_config.setup_pipeline("athena_" + uniq_id(), dev_mode=True)
4649

@@ -55,7 +58,8 @@ def test_athena_lakeformation_config_gating(
5558
)
5659

5760
client.update_stored_schema()
58-
if lf_enabled:
61+
# disable and enable flag with add / remove tags respectively, None will skip
62+
if lf_enabled is not None:
5963
mocked_manage.assert_called()
6064
else:
6165
mocked_manage.assert_not_called()

0 commit comments

Comments
 (0)