Skip to content

Commit 80aea59

Browse files
committed
Snapshot support
1 parent 776c077 commit 80aea59

File tree

13 files changed

+512
-12
lines changed

13 files changed

+512
-12
lines changed

.github/docker/docker-compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ services:
55
restart: always
66
ports:
77
- "2136:2136"
8+
- "8765:8765"
89
hostname: localhost
910
environment:
1011
- YDB_USE_IN_MEMORY_PDISKS=true

dbt/adapters/ydb/column.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,8 @@ def numeric_type(cls, dtype: str, precision: Any, scale: Any) -> str:
8989
@classmethod
9090
def nested_type(cls, dtype: str, is_nullable: bool) -> str:
9191
template = "{}"
92-
if is_nullable:
93-
template = template.format("Nullable({})")
92+
if not is_nullable:
93+
template = template.format("{} NOT NULL")
9494
return template.format(dtype)
9595

9696
def literal(self, value):

dbt/adapters/ydb/impl.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,14 +300,47 @@ def valid_incremental_strategies(self):
300300

301301
@available.parse_none
302302
def get_column_schema_from_query(self, sql: str, *_) -> List[YDBColumn]:
303-
logger.info(f"Try to get column schema from query: \n{sql}")
303+
logger.debug(f"Try to get column schema from query: \n{sql}")
304304
connection = self.connections.get_thread_connection()
305305
dbapi_connection = connection.handle
306306

307307
with dbapi_connection.cursor() as cur:
308308
cur.execute(sql)
309309
return [YDBColumn(col[0], col[1]) for col in cur.description]
310310

311+
def timestamp_add_sql(self, add_to: str, number: int = 1, interval: str = "hour") -> str:
312+
match interval:
313+
case "day":
314+
return f"{add_to} + DateTime::IntervalFromDays({number})"
315+
case "hour":
316+
return f"{add_to} + DateTime::IntervalFromHours({number})"
317+
case "minute":
318+
return f"{add_to} + DateTime::IntervalFromMinutes({number})"
319+
case "second":
320+
return f"{add_to} + DateTime::IntervalFromSeconds({number})"
321+
case "millisecond":
322+
return f"{add_to} + DateTime::IntervalFromMilliseconds({number})"
323+
case "microsecond":
324+
return f"{add_to} + DateTime::IntervalFromMicroseconds({number})"
325+
case _:
326+
raise DbtRuntimeError(
327+
f"Unsupported value for interval: {interval}, only day, hour,"
328+
"minute, second, millisecond, microsecond are supported"
329+
)
330+
331+
def string_add_sql(
332+
self,
333+
add_to: str,
334+
value: str,
335+
location="append",
336+
) -> str:
337+
if location == "append":
338+
return f"{add_to} || Text('{value}')"
339+
elif location == "prepend":
340+
return f"Text('{value}') || {add_to}"
341+
else:
342+
raise DbtRuntimeError(f'Got an unexpected location value of "{location}"')
343+
311344

312345

313346
COLUMNS_EQUAL_SQL = '''

dbt/include/ydb/macros/adapters.sql

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,5 @@ Example 3 of 3 of required macros that does not have a default implementation.
186186
*/
187187

188188
{% macro ydb__current_timestamp() -%}
189-
'''Returns current UTC time'''
190189
CurrentUtcTimestamp()
191190
{% endmacro %}

dbt/include/ydb/macros/materializations/models/table.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
, TTL = {{ ttl_expr }}
3636
{%- endif -%}
3737
)
38-
as (
38+
as
3939
{{ sql }}
40-
)
40+
4141
{%- endmacro %}
Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
2+
{% macro ydb__build_snapshot_table(strategy, sql) %}
3+
{% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %}
4+
5+
select `sbq`.*,
6+
{{ strategy.scd_id }} as {{ columns.dbt_scd_id }},
7+
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
8+
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
9+
{{ ydb__get_dbt_valid_to_current(strategy, columns) }}
10+
{%- if strategy.hard_deletes == 'new_record' -%}
11+
, 'False' as {{ columns.dbt_is_deleted }}
12+
{% endif -%}
13+
from (
14+
{{ sql }}
15+
) sbq
16+
17+
{% endmacro %}
18+
19+
{% macro ydb__get_dbt_valid_to_current(strategy, columns) %}
20+
{% set dbt_valid_to_current = config.get('dbt_valid_to_current') or "null" %}
21+
coalesce(case when {{ strategy.updated_at }} = {{ strategy.updated_at }} then null else {{ strategy.updated_at }} end, {{dbt_valid_to_current}})
22+
as {{ columns.dbt_valid_to }}
23+
{% endmacro %}
24+
25+
{% macro ydb__build_snapshot_staging_table(strategy, sql, target_relation) %}
26+
{% set temp_relation = make_temp_relation(target_relation) %}
27+
28+
{% do drop_relation(temp_relation) %}
29+
30+
{% set select = snapshot_staging_table(strategy, sql, target_relation) %}
31+
32+
{% call statement('build_snapshot_staging_relation') %}
33+
{{ ydb__create_snapshot_table_as(False, temp_relation, select) }}
34+
{% endcall %}
35+
36+
{% do return(temp_relation) %}
37+
{% endmacro %}
38+
39+
{% macro ydb__create_snapshot_table_as(temporary, relation, sql) -%}
40+
{%- set sql_header = config.get('sql_header', none) -%}
41+
42+
{% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %}
43+
44+
{%- set primary_key_expr = columns.dbt_scd_id -%}
45+
46+
{%- set store_type = 'row' -%}
47+
48+
{%- set auto_partitioning_by_size = model['config'].get('auto_partitioning_by_size') -%}
49+
{%- set auto_partitioning_partition_size_mb = model['config'].get('auto_partitioning_partition_size_mb') -%}
50+
{%- set ttl_expr = model['config'].get('ttl') -%}
51+
52+
{{ sql_header if sql_header is not none }}
53+
54+
create {% if temporary: -%}temporary{%- endif %} table
55+
{{ relation.include(database=(not temporary), schema=(not temporary)) }}
56+
(primary key ({{ primary_key_expr }}))
57+
{% set contract_config = config.get('contract') %}
58+
{% if contract_config.enforced and (not temporary) %}
59+
{{ get_assert_columns_equivalent(sql) }}
60+
{{ get_table_columns_and_constraints() }}
61+
{%- set sql = get_select_subquery(sql) %}
62+
{% endif %}
63+
WITH (
64+
STORE = {{ store_type }}
65+
{%- if auto_partitioning_by_size is not none -%}
66+
, AUTO_PARTITIONING_BY_SIZE = {{ auto_partitioning_by_size }}
67+
{%- endif -%}
68+
{%- if auto_partitioning_partition_size_mb is not none -%}
69+
, AUTO_PARTITIONING_PARTITION_SIZE_MB = {{ auto_partitioning_partition_size_mb }}
70+
{%- endif -%}
71+
{%- if ttl_expr is not none -%}
72+
, TTL = {{ ttl_expr }}
73+
{%- endif -%}
74+
)
75+
as
76+
{{ sql }}
77+
78+
{%- endmacro %}
79+
80+
81+
{% macro ydb__snapshot_staging_table(strategy, source_sql, target_relation) -%}
82+
{% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %}
83+
{% if strategy.hard_deletes == 'new_record' %}
84+
{% set new_scd_id = snapshot_hash_arguments([columns.dbt_scd_id, snapshot_get_time()]) %}
85+
{% endif %}
86+
87+
select `insertions`.* from (
88+
select
89+
'insert' as dbt_change_type,
90+
`source_data`.*
91+
{%- if strategy.hard_deletes == 'new_record' -%}
92+
,'False' as {{ columns.dbt_is_deleted }}
93+
{%- endif %}
94+
95+
from (
96+
select `source`.*, {{ unique_key_fields(strategy.unique_key) }},
97+
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
98+
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
99+
{{ ydb__get_dbt_valid_to_current(strategy, columns) }},
100+
{{ strategy.scd_id }} as {{ columns.dbt_scd_id }}
101+
102+
from ({{ source_sql }}) as source
103+
104+
) as source_data
105+
left outer join (
106+
select `target`.*, {{ unique_key_fields(strategy.unique_key) }}
107+
from {{ target_relation }} as target
108+
where
109+
{% if config.get('dbt_valid_to_current') %}
110+
{% set source_unique_key = columns.dbt_valid_to | trim %}
111+
{% set target_unique_key = config.get('dbt_valid_to_current') | trim %}
112+
113+
{# The exact equals semantics between NULL values depends on the current behavior flag set. Also, update records if the source field is null #}
114+
( {{ equals(source_unique_key, target_unique_key) }} or {{ source_unique_key }} is null )
115+
{% else %}
116+
{{ columns.dbt_valid_to }} is null
117+
{% endif %}
118+
) as snapshotted_data
119+
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
120+
where {{ unique_key_is_null(strategy.unique_key, "snapshotted_data") }}
121+
or ({{ unique_key_is_not_null(strategy.unique_key, "snapshotted_data") }} and (
122+
{{ strategy.row_changed }} {%- if strategy.hard_deletes == 'new_record' -%} or snapshotted_data.{{ columns.dbt_is_deleted }} = 'True' {% endif %}
123+
)
124+
125+
)
126+
) as insertions
127+
union all
128+
select `updates`.* from (
129+
select
130+
'update' as dbt_change_type,
131+
source_data.*,
132+
snapshotted_data.{{ columns.dbt_scd_id }} as {{ columns.dbt_scd_id }},
133+
{%- if strategy.hard_deletes == 'new_record' -%}
134+
, snapshotted_data.{{ columns.dbt_is_deleted }}
135+
{%- endif %}
136+
137+
from (
138+
select `source`.*, {{ unique_key_fields(strategy.unique_key) }},
139+
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
140+
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
141+
{{ strategy.updated_at }} as {{ columns.dbt_valid_to }}
142+
143+
from ({{ source_sql }}) as source
144+
) as source_data
145+
join (
146+
select `target`.*, {{ unique_key_fields(strategy.unique_key) }}
147+
from {{ target_relation }} as target
148+
where
149+
{% if config.get('dbt_valid_to_current') %}
150+
{% set source_unique_key = columns.dbt_valid_to | trim %}
151+
{% set target_unique_key = config.get('dbt_valid_to_current') | trim %}
152+
153+
{# The exact equals semantics between NULL values depends on the current behavior flag set. Also, update records if the source field is null #}
154+
( {{ equals(source_unique_key, target_unique_key) }} or {{ source_unique_key }} is null )
155+
{% else %}
156+
{{ columns.dbt_valid_to }} is null
157+
{% endif %}
158+
) as snapshotted_data
159+
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
160+
where (
161+
{{ strategy.row_changed }} {%- if strategy.hard_deletes == 'new_record' -%} or snapshotted_data.{{ columns.dbt_is_deleted }} = 'True' {% endif %}
162+
)
163+
) as updates
164+
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
165+
union all
166+
select `deletes`.* from (
167+
select
168+
'delete' as dbt_change_type,
169+
-- source_data.*, we are not able to use it because we lose not null constraint on unique key
170+
snapshotted_data.{{ strategy.unique_key }} as {{ strategy.unique_key }},
171+
172+
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
173+
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
174+
{{ snapshot_get_time() }} as {{ columns.dbt_valid_to }},
175+
snapshotted_data.{{ columns.dbt_scd_id }} as {{ columns.dbt_scd_id }},
176+
{%- if strategy.hard_deletes == 'new_record' -%}
177+
, snapshotted_data.{{ columns.dbt_is_deleted }}
178+
{%- endif %}
179+
from (
180+
select `target`.*, {{ unique_key_fields(strategy.unique_key) }}
181+
from {{ target_relation }} as target
182+
where
183+
{% if config.get('dbt_valid_to_current') %}
184+
{% set source_unique_key = columns.dbt_valid_to | trim %}
185+
{% set target_unique_key = config.get('dbt_valid_to_current') | trim %}
186+
187+
{# The exact equals semantics between NULL values depends on the current behavior flag set. Also, update records if the source field is null #}
188+
( {{ equals(source_unique_key, target_unique_key) }} or {{ source_unique_key }} is null )
189+
{% else %}
190+
{{ columns.dbt_valid_to }} is null
191+
{% endif %}
192+
) as snapshotted_data
193+
left join (
194+
select `source`.*, {{ unique_key_fields(strategy.unique_key) }}
195+
from ({{ source_sql }}) as source
196+
) as source_data
197+
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
198+
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
199+
200+
{%- if strategy.hard_deletes == 'new_record' %}
201+
and not (
202+
--avoid updating the record's valid_to if the latest entry is marked as deleted
203+
snapshotted_data.{{ columns.dbt_is_deleted }} = 'True'
204+
and snapshotted_data.{{ columns.dbt_valid_to }} is null
205+
)
206+
{%- endif %}
207+
) as deletes
208+
{%- endif %}
209+
{%- if strategy.hard_deletes == 'new_record' %}
210+
union all
211+
select `deletion_records`.* from (
212+
select
213+
'insert' as dbt_change_type,
214+
{#
215+
If a column has been added to the source it won't yet exist in the
216+
snapshotted table so we insert a null value as a placeholder for the column.
217+
#}
218+
{%- for col in source_sql_cols -%}
219+
{%- if col.name in snapshotted_cols -%}
220+
snapshotted_data.{{ adapter.quote(col.column) }},
221+
{%- else -%}
222+
NULL as {{ adapter.quote(col.column) }},
223+
{%- endif -%}
224+
{% endfor -%}
225+
{%- if strategy.unique_key | is_list -%}
226+
{%- for key in strategy.unique_key -%}
227+
snapshotted_data.{{ key }} as dbt_unique_key_{{ loop.index }},
228+
{% endfor -%}
229+
{%- else -%}
230+
snapshotted_data.dbt_unique_key as dbt_unique_key,
231+
{% endif -%}
232+
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
233+
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
234+
snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }},
235+
{{ new_scd_id }} as {{ columns.dbt_scd_id }},
236+
'True' as {{ columns.dbt_is_deleted }}
237+
from (
238+
select `target`.*, {{ unique_key_fields(strategy.unique_key) }}
239+
from {{ target_relation }} as target
240+
where
241+
{% if config.get('dbt_valid_to_current') %}
242+
{% set source_unique_key = columns.dbt_valid_to | trim %}
243+
{% set target_unique_key = config.get('dbt_valid_to_current') | trim %}
244+
245+
{# The exact equals semantics between NULL values depends on the current behavior flag set. Also, update records if the source field is null #}
246+
( {{ equals(source_unique_key, target_unique_key) }} or {{ source_unique_key }} is null )
247+
{% else %}
248+
{{ columns.dbt_valid_to }} is null
249+
{% endif %}
250+
) as snapshotted_data
251+
left join (
252+
select `source`.*, {{ unique_key_fields(strategy.unique_key) }}
253+
from ({{ source_sql }}) as source
254+
) as source_data
255+
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
256+
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
257+
and not (
258+
--avoid inserting a new record if the latest one is marked as deleted
259+
snapshotted_data.{{ columns.dbt_is_deleted }} = 'True'
260+
and snapshotted_data.{{ columns.dbt_valid_to }} is null
261+
)
262+
) as deletion_records
263+
{%- endif %}
264+
265+
266+
{%- endmacro %}

0 commit comments

Comments
 (0)