Skip to content

Commit 78d9ab9

Browse files
authored
Merge pull request #12 from ydb-platform/incremental_merge
Incremental materializations support
2 parents 7884273 + 9238c63 commit 78d9ab9

File tree

7 files changed

+124
-43
lines changed

7 files changed

+124
-43
lines changed

README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,17 @@ profile_name:
7676
| `auto_partitioning_partition_size_mb` | Partition size in megabytes for automatic partitioning | `no` | |
7777
| `ttl` | Time-to-live (TTL) expression for automatic data expiration | `no` | |
7878

79+
#### Incremental
80+
81+
| Option | Description | Required | Default |
82+
| ------ | ----------- | -------- | ------- |
83+
| `incremental_strategy` | Strategy of incremental materialization. Current adapter supports only `merge` strategy, which will use `YDB`'s `UPSERT` operation. | `no` | `default` |
84+
| `primary_key` | Primary key expression to use during table creation | `yes` | |
85+
| `store_type` | Type of table. Available options are `row` and `column` | `no` | `row` |
86+
| `auto_partitioning_by_size` | Enable automatic partitioning by size. Available options are `ENABLED` and `DISABLED` | `no` | |
87+
| `auto_partitioning_partition_size_mb` | Partition size in megabytes for automatic partitioning | `no` | |
88+
| `ttl` | Time-to-live (TTL) expression for automatic data expiration | `no` | |
89+
7990
##### Example table configuration
8091

8192
```sql

dbt/adapters/ydb/column.py

Lines changed: 9 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,13 @@
1111
@dataclass
1212
class YDBColumn(Column):
1313
TYPE_LABELS = {
14-
'STRING': 'String',
14+
'STRING': 'Text',
1515
'TIMESTAMP': 'DateTime',
16-
'FLOAT': 'Float32',
17-
'INTEGER': 'Int32',
16+
'FLOAT': 'Float',
17+
'INTEGER': 'Int64',
1818
}
1919
is_nullable: bool = False
20-
is_low_cardinality: bool = False
21-
_low_card_regex = re.compile(r'^LowCardinality\((.*)\)$')
2220
_nullable_regex = re.compile(r'^(.*)\?$')
23-
_fix_size_regex = re.compile(r'FixedString\((.*?)\)')
2421
_decimal_regex = re.compile(r'Decimal\((\d+), (\d+)\)')
2522

2623
def __init__(self, column: str, dtype: str) -> None:
@@ -30,11 +27,6 @@ def __init__(self, column: str, dtype: str) -> None:
3027

3128
dtype = self._inner_dtype(dtype)
3229

33-
if dtype.lower().startswith('fixedstring'):
34-
match_sized = self._fix_size_regex.search(dtype)
35-
if match_sized:
36-
char_size = int(match_sized.group(1))
37-
3830
if dtype.lower().startswith('decimal'):
3931
match_dec = self._decimal_regex.search(dtype)
4032
numeric_precision = 0
@@ -57,26 +49,16 @@ def data_type(self) -> str:
5749
else:
5850
data_t = self.dtype
5951

60-
if self.is_nullable or self.is_low_cardinality:
61-
data_t = self.nested_type(data_t, self.is_low_cardinality, self.is_nullable)
52+
if self.is_nullable:
53+
data_t = self.nested_type(data_t, self.is_nullable)
6254

6355
return data_t
6456

6557
def is_string(self) -> bool:
6658
return self.dtype.lower() in [
67-
'string',
68-
'fixedstring',
69-
'longblob',
70-
'longtext',
71-
'tinytext',
7259
'text',
73-
'varchar',
74-
'mediumblob',
75-
'blob',
76-
'tinyblob',
77-
'char',
78-
'mediumtext',
79-
] or self.dtype.lower().startswith('fixedstring')
60+
'utf8',
61+
]
8062

8163
def is_integer(self) -> bool:
8264
return self.dtype.lower().startswith('int') or self.dtype.lower().startswith('uint')
@@ -98,17 +80,15 @@ def string_size(self) -> int:
9880

9981
@classmethod
10082
def string_type(cls, size: int) -> str:
101-
return 'String'
83+
return 'Text'
10284

10385
@classmethod
10486
def numeric_type(cls, dtype: str, precision: Any, scale: Any) -> str:
10587
return f'Decimal({precision}, {scale})'
10688

10789
@classmethod
108-
def nested_type(cls, dtype: str, is_low_cardinality: bool, is_nullable: bool) -> str:
90+
def nested_type(cls, dtype: str, is_nullable: bool) -> str:
10991
template = "{}"
110-
if is_low_cardinality:
111-
template = template.format("LowCardinality({})")
11292
if is_nullable:
11393
template = template.format("Nullable({})")
11494
return template.format(dtype)
@@ -125,10 +105,6 @@ def can_expand_to(self, other_column: 'Column') -> bool:
125105
def _inner_dtype(self, dtype) -> str:
126106
inner_dtype = dtype.strip()
127107

128-
if low_card_match := self._low_card_regex.search(inner_dtype):
129-
self.is_low_cardinality = True
130-
inner_dtype = low_card_match.group(1)
131-
132108
if null_match := self._nullable_regex.search(inner_dtype):
133109
self.is_nullable = True
134110
inner_dtype = null_match.group(1)

dbt/adapters/ydb/impl.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ def get_columns_in_relation(self, relation):
119119
return []
120120

121121
with dbapi_connection.cursor() as cur:
122-
cur.execute(f"SELECT * FROM {relation} LIMIT 0")
122+
cur.execute(f"SELECT * FROM {relation} LIMIT 1")
123123
return [YDBColumn(col[0], col[1]) for col in cur.description]
124124

125125
def get_rows_different_sql(
@@ -292,6 +292,22 @@ def parse_columns_from_relation(self, relation: BaseRelation) -> List[Dict[str,
292292
columns.append(column)
293293
return columns
294294

295+
def valid_incremental_strategies(self):
296+
"""The set of standard builtin strategies which this adapter supports out-of-the-box.
297+
Not used to validate custom strategies defined by end users.
298+
"""
299+
return ["merge"]
300+
301+
@available.parse_none
302+
def get_column_schema_from_query(self, sql: str, *_) -> List[YDBColumn]:
303+
logger.info(f"Try to get column schema from query: \n{sql}")
304+
connection = self.connections.get_thread_connection()
305+
dbapi_connection = connection.handle
306+
307+
with dbapi_connection.cursor() as cur:
308+
cur.execute(sql)
309+
return [YDBColumn(col[0], col[1]) for col in cur.description]
310+
295311

296312

297313
COLUMNS_EQUAL_SQL = '''

dbt/include/ydb/macros/adapters.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,5 +187,5 @@ Example 3 of 3 of required macros that does not have a default implementation.
187187

188188
{% macro ydb__current_timestamp() -%}
189189
'''Returns current UTC time'''
190-
{# docs show not to be implemented currently. #}
190+
CurrentUtcTimestamp()
191191
{% endmacro %}

dbt/include/ydb/macros/materializations/models/incremental/incremental.sql

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@
4444
{% set relation_for_indexes = intermediate_relation %}
4545
{% set need_swap = true %}
4646
{% else %}
47-
{% do run_query(get_create_table_as_sql(True, temp_relation, sql)) %}
47+
{% do run_query(get_create_table_as_sql(False, temp_relation, sql)) %}
48+
{% do to_drop.append(temp_relation) %}
4849
{% set relation_for_indexes = temp_relation %}
4950
{% set contract_config = config.get('contract') %}
5051
{% if not contract_config or not contract_config.enforced %}
@@ -104,4 +105,44 @@
104105

105106
{{ return({'relations': [target_relation]}) }}
106107

107-
{%- endmaterialization %}
108+
{%- endmaterialization %}
109+
110+
{% macro ydb__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates=none) -%}
111+
{%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%}
112+
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
113+
{%- set merge_update_columns = config.get('merge_update_columns') -%}
114+
{%- set merge_exclude_columns = config.get('merge_exclude_columns') -%}
115+
{%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%}
116+
{%- set sql_header = config.get('sql_header', none) -%}
117+
118+
{% if unique_key %}
119+
{% if unique_key is sequence and unique_key is not mapping and unique_key is not string %}
120+
{% for key in unique_key %}
121+
{% set this_key_match %}
122+
DBT_INTERNAL_SOURCE.{{ key }} = DBT_INTERNAL_DEST.{{ key }}
123+
{% endset %}
124+
{% do predicates.append(this_key_match) %}
125+
{% endfor %}
126+
{% else %}
127+
{% set source_unique_key = ("DBT_INTERNAL_SOURCE." ~ unique_key) | trim %}
128+
{% set target_unique_key = ("DBT_INTERNAL_DEST." ~ unique_key) | trim %}
129+
{% set unique_key_match = equals(source_unique_key, target_unique_key) | trim %}
130+
{% do predicates.append(unique_key_match) %}
131+
{% endif %}
132+
{% else %}
133+
{% do predicates.append('FALSE') %}
134+
{% endif %}
135+
136+
{{ sql_header if sql_header is not none }}
137+
138+
upsert into {{ target }}
139+
select {{ dest_cols_csv }} from {{ source }}
140+
141+
{% endmacro %}
142+
143+
144+
{% macro ydb__get_incremental_default_sql(arg_dict) %}
145+
146+
{% do return(get_incremental_merge_sql(arg_dict)) %}
147+
148+
{% endmacro %}

examples/jaffle_shop/models/orders.sql

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ select
1818

1919
from {{ ref('stg_orders') }} as orders
2020

21-
2221
left join (
2322
select
2423
order_id,
Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,47 @@
1-
# from dbt.tests.adapter.basic.test_incremental import BaseIncremental, BaseIncrementalNotSchemaChange
1+
import pytest
22

3+
from dbt.tests.adapter.basic import files
4+
from dbt.tests.adapter.basic.test_incremental import BaseIncremental, BaseIncrementalNotSchemaChange
35

4-
# class TestIncremental(BaseIncremental):
5-
# pass
66

7+
config_materialized_incremental = """
8+
{{ config(materialized="incremental", primary_key="id") }}
9+
"""
710

8-
# class TestIncrementalNotSchemaChange(BaseIncrementalNotSchemaChange):
9-
# pass
11+
model_incremental = """
12+
select * from {{ source('raw', 'seed') }}
13+
{% if is_incremental() %}
14+
where id > 10
15+
{% endif %}
16+
""".strip()
17+
18+
incremental_not_schema_change_sql = """
19+
{{ config(
20+
materialized="incremental",
21+
primary_key="user_id_current_time",
22+
unique_key="user_id_current_time",
23+
on_schema_change="sync_all_columns"
24+
) }}
25+
select
26+
'1' || '-' || Cast(CurrentUtcTimestamp() as Text) as user_id_current_time,
27+
{% if is_incremental() %}
28+
'thisis18characters' as platform
29+
{% else %}
30+
'okthisis20characters' as platform
31+
{% endif %}
32+
"""
33+
34+
class TestIncremental(BaseIncremental):
35+
@pytest.fixture(scope="class")
36+
def models(self):
37+
return {
38+
"incremental.sql": config_materialized_incremental + model_incremental,
39+
"schema.yml": files.schema_base_yml,
40+
}
41+
42+
43+
44+
class TestIncrementalNotSchemaChange(BaseIncrementalNotSchemaChange):
45+
@pytest.fixture(scope="class")
46+
def models(self):
47+
return {"incremental_not_schema_change.sql": incremental_not_schema_change_sql}

0 commit comments

Comments
 (0)