Skip to content

Commit 9b02f03

Browse files
committed
enables refresh mode tests for athena, fixes tests
1 parent c11b60c commit 9b02f03

File tree

1 file changed

+23
-15
lines changed

1 file changed

+23
-15
lines changed

tests/load/pipeline/test_refresh_modes.py

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ def some_data_4():
9999
"destination_config",
100100
destinations_configs(
101101
default_sql_configs=True,
102-
subset=["duckdb", "filesystem", "iceberg"],
102+
subset=["duckdb", "filesystem", "iceberg", "athena"],
103103
local_filesystem_configs=True,
104104
table_format_local_configs=True,
105105
),
@@ -110,7 +110,9 @@ def some_data_4():
110110
def test_refresh_drop_sources(
111111
destination_config: DestinationTestConfiguration, in_source: bool, with_wipe: bool
112112
):
113-
pipeline = destination_config.setup_pipeline("refresh_source")
113+
pipeline_name = "refresh_source"
114+
dataset_name = pipeline_name + uniq_id()
115+
pipeline = destination_config.setup_pipeline(pipeline_name, dataset_name=dataset_name)
114116

115117
data: Any = refresh_source(first_run=True, drop_sources=True)
116118
if not in_source:
@@ -123,7 +125,7 @@ def test_refresh_drop_sources(
123125
# Second run of pipeline with only selected resources
124126
if with_wipe:
125127
pipeline._wipe_working_folder()
126-
pipeline = destination_config.setup_pipeline("refresh_source")
128+
pipeline = destination_config.setup_pipeline(pipeline_name, dataset_name=dataset_name)
127129

128130
data = refresh_source(first_run=False, drop_sources=True).with_resources(
129131
"some_data_1", "some_data_2"
@@ -173,7 +175,9 @@ def test_existing_schema_hash(destination_config: DestinationTestConfiguration):
173175
"""Test when new schema is identical to a previously stored schema after dropping and re-creating tables.
174176
The change should be detected regardless and tables are created again in destination db
175177
"""
176-
pipeline = destination_config.setup_pipeline("refresh_full_test", refresh="drop_sources")
178+
pipeline = destination_config.setup_pipeline(
179+
"refresh_full_test", refresh="drop_sources", dev_mode=True
180+
)
177181

178182
info = pipeline.run(
179183
refresh_source(first_run=True, drop_sources=True), **destination_config.run_kwargs
@@ -210,15 +214,13 @@ def test_existing_schema_hash(destination_config: DestinationTestConfiguration):
210214
assert new_schema_hash == first_schema_hash
211215

212216

213-
pytest.mark.essential
214-
215-
217+
@pytest.mark.essential
216218
@pytest.mark.parametrize(
217219
"destination_config",
218220
destinations_configs(
219221
default_sql_configs=True,
220222
local_filesystem_configs=True,
221-
subset=["duckdb", "filesystem", "iceberg"],
223+
subset=["duckdb", "filesystem", "iceberg", "athena"],
222224
table_format_local_configs=True,
223225
),
224226
ids=lambda x: x.name,
@@ -232,19 +234,21 @@ def test_refresh_drop_resources(
232234
pytest.skip("not needed")
233235

234236
# First run pipeline with load to destination so tables are created
235-
pipeline = destination_config.setup_pipeline("refresh_source")
237+
pipeline_name = "refresh_source"
238+
dataset_name = pipeline_name + uniq_id()
239+
pipeline = destination_config.setup_pipeline(pipeline_name, dataset_name=dataset_name)
236240

237241
data: Any = refresh_source(first_run=True)
238242
if not in_source:
239243
data = list(data.selected_resources.values())
240244

241-
info = pipeline.run(data, refresh="drop_resources", **destination_config.run_kwargs)
245+
info = pipeline.run(data, **destination_config.run_kwargs)
242246
assert_load_info(info)
243247

244248
# Second run of pipeline with only selected resources
245249
if with_wipe:
246250
pipeline._wipe_working_folder()
247-
pipeline = destination_config.setup_pipeline("refresh_source")
251+
pipeline = destination_config.setup_pipeline(pipeline_name, dataset_name=dataset_name)
248252

249253
data = refresh_source(first_run=False).with_resources("some_data_1", "some_data_2")
250254
if not in_source:
@@ -298,7 +302,9 @@ def test_refresh_drop_resources(
298302
def test_refresh_drop_data_only(destination_config: DestinationTestConfiguration):
299303
"""Refresh drop_data should truncate all selected tables before load"""
300304
# First run pipeline with load to destination so tables are created
301-
pipeline = destination_config.setup_pipeline("refresh_full_test", refresh="drop_data")
305+
pipeline = destination_config.setup_pipeline(
306+
"refresh_full_test", refresh="drop_data", dev_mode=True
307+
)
302308

303309
info = pipeline.run(
304310
refresh_source(first_run=True), write_disposition="append", **destination_config.run_kwargs
@@ -408,7 +414,9 @@ def source_2_data_2():
408414
yield source_2_data_1
409415
yield source_2_data_2
410416

411-
pipeline = destination_config.setup_pipeline("refresh_full_test", refresh="drop_sources")
417+
pipeline = destination_config.setup_pipeline(
418+
"refresh_full_test", refresh="drop_sources", dev_mode=True
419+
)
412420

413421
# Run both sources
414422
info = pipeline.run(
@@ -459,7 +467,7 @@ def source_2_data_2():
459467
ids=lambda x: x.name,
460468
)
461469
def test_refresh_argument_to_run(destination_config: DestinationTestConfiguration):
462-
pipeline = destination_config.setup_pipeline("refresh_full_test")
470+
pipeline = destination_config.setup_pipeline("refresh_full_test", dev_mode=True)
463471

464472
info = pipeline.run(refresh_source(first_run=True), **destination_config.run_kwargs)
465473
assert_load_info(info)
@@ -495,7 +503,7 @@ def test_refresh_argument_to_run(destination_config: DestinationTestConfiguratio
495503
ids=lambda x: x.name,
496504
)
497505
def test_refresh_argument_to_extract(destination_config: DestinationTestConfiguration):
498-
pipeline = destination_config.setup_pipeline("refresh_full_test")
506+
pipeline = destination_config.setup_pipeline("refresh_full_test", dev_mode=True)
499507

500508
info = pipeline.run(refresh_source(first_run=True), **destination_config.run_kwargs)
501509
assert_load_info(info)

0 commit comments

Comments
 (0)