Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions dbt/adapters/clickhouse/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dbt.adapters.base import AdapterPlugin
from dbt.contracts.graph.nodes import BaseNode

from dbt.adapters.clickhouse.column import ClickHouseColumn # noqa
from dbt.adapters.clickhouse.connections import ClickHouseConnectionManager # noqa
Expand All @@ -12,3 +13,20 @@
credentials=ClickHouseCredentials,
include_path=clickhouse.PACKAGE_PATH,
)


def get_materialization(self):
"""
Aliases `materialized` config `incremental` or `table` in combination with `is_distributed` model config set to true
to `distributed_incremental` or `distributed_table` respectively. This is required for compatibility of dbt-core
microbatch functionalities with distributed models.
"""
materialized = self.config.materialized
is_distributed = self.config.extra.get('is_distributed')
if materialized in ('incremental', 'table') and is_distributed:
materialized = f"distributed_{materialized}"
return materialized


# patches a BaseNode method to allow setting `materialized` config overrides via dbt flags
BaseNode.get_materialization = get_materialization
10 changes: 7 additions & 3 deletions dbt/adapters/clickhouse/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,15 @@ def can_exchange(self, schema: str, rel_type: str) -> bool:
return ch_db and ch_db.engine in ('Atomic', 'Replicated')

@available.parse_none
def should_on_cluster(self, materialized: str = '', engine: str = '') -> bool:
def should_on_cluster(
self, materialized: str = '', is_distributed: bool = False, engine: str = ''
) -> bool:
conn = self.connections.get_if_exists()
if conn and conn.credentials.cluster:
return ClickHouseRelation.get_on_cluster(conn.credentials.cluster, materialized, engine)
return ClickHouseRelation.get_on_cluster('', materialized, engine)
return ClickHouseRelation.get_on_cluster(
conn.credentials.cluster, materialized, is_distributed, engine
)
return ClickHouseRelation.get_on_cluster('', materialized, is_distributed, engine)

@available.parse_none
def calculate_incremental_strategy(self, strategy: str) -> str:
Expand Down
10 changes: 8 additions & 2 deletions dbt/adapters/clickhouse/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,17 @@ def should_on_cluster(self) -> bool:

@classmethod
def get_on_cluster(
cls: Type[Self], cluster: str = '', materialized: str = '', engine: str = ''
cls: Type[Self],
cluster: str = '',
materialized: str = '',
is_distributed: bool = False,
engine: str = '',
) -> bool:
if cluster.strip():
return (
materialized in ('view', 'dictionary')
or 'distributed' in materialized
or is_distributed
or 'Replicated' in engine
)

Expand Down Expand Up @@ -142,8 +147,9 @@ def create_from(

else:
materialized = relation_config.config.get('materialized') or ''
is_distributed = relation_config.config.get('extra', {}).get('is_distributed')
engine = relation_config.config.get('engine') or ''
Comment on lines +150 to 151
Copy link
Preview

Copilot AI Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value for is_distributed may be passed as a string instead of a boolean, which could lead to unintended behavior in get_on_cluster. Consider adding an explicit conversion to boolean.

Suggested change
is_distributed = relation_config.config.get('extra', {}).get('is_distributed')
engine = relation_config.config.get('engine') or ''
is_distributed = relation_config.config.get('extra', {}).get('is_distributed')
# Ensure is_distributed is a boolean
is_distributed = str(is_distributed).lower() == "true" if is_distributed is not None else False

Copilot uses AI. Check for mistakes.

can_on_cluster = cls.get_on_cluster(cluster, materialized, engine)
can_on_cluster = cls.get_on_cluster(cluster, materialized, is_distributed, engine)

return cls.create(
database='',
Expand Down
2 changes: 1 addition & 1 deletion dbt/include/clickhouse/macros/adapters/relation.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
{% endif %}

{%- set can_exchange = adapter.can_exchange(schema, type) %}
{%- set should_on_cluster = adapter.should_on_cluster(config.get('materialized'), engine_clause()) %}
{%- set should_on_cluster = adapter.should_on_cluster(config.get('materialized'), config.get('is_distributed'), engine_clause()) %}
{%- set new_relation = api.Relation.create(
database=None,
schema=schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
{% call statement('main') %}
{{ create_view_as(view_relation, sql) }}
{% endcall %}
{% do to_drop.append(view_relation) %}

{% if existing_relation_local is none %}
-- No existing local table, recreate local and distributed tables
Expand Down Expand Up @@ -95,6 +96,14 @@
{% set need_swap = true %}
{% elif incremental_strategy == 'delete_insert' %}
{% do clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates, True) %}
{% elif incremental_strategy == 'microbatch' %}
{%- if config.get("__dbt_internal_microbatch_event_time_start") -%}
{% do incremental_predicates.append(config.get("event_time") ~ " >= toDateTime('" ~ config.get("__dbt_internal_microbatch_event_time_start").strftime("%Y-%m-%d %H:%M:%S") ~ "')") %}
{%- endif -%}
{%- if model.config.__dbt_internal_microbatch_event_time_end -%}
{% do incremental_predicates.append(config.get("event_time") ~ " < toDateTime('" ~ config.get("__dbt_internal_microbatch_event_time_end").strftime("%Y-%m-%d %H:%M:%S") ~ "')") %}
{%- endif -%}
{% do clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates, True) %}
{% elif incremental_strategy == 'insert_overwrite' %}
{% do clickhouse__incremental_insert_overwrite(existing_relation, partition_by, True) %}
{% elif incremental_strategy == 'append' %}
Expand Down Expand Up @@ -150,6 +159,6 @@

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}
{{ return({'relations': [target_relation, target_relation_local]}) }}

{%- endmaterialization %}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@
_input_model_sql = """
{{
config(
materialized='table',
materialized='%s',
event_time='event_time'
)
}}

select 1 as id, toDateTime('2020-01-01 00:00:00') as event_time
union all
select 2 as id, toDateTime('2020-01-02 00:00:00') as event_time
Expand All @@ -20,14 +19,14 @@
{{
config(
materialized='incremental',
is_distributed=%s,
incremental_strategy='microbatch',
unique_key='id',
event_time='event_time',
batch_size='day',
begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)
)
}}

select * from {{ ref('input_model') }}
"""

Expand All @@ -36,8 +35,24 @@ class TestMicrobatchIncremental(BaseMicrobatch):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": _input_model_sql,
"microbatch_model.sql": _microbatch_model_sql,
"input_model.sql": _input_model_sql % "table",
"microbatch_model.sql": _microbatch_model_sql % "False", # `is_distributed` param
Copy link
Preview

Copilot AI Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The is_distributed parameter is provided as a string ('False' or 'True') in this test, but the adapter functions expect a boolean value. Consider converting the string to a boolean to avoid misinterpretation of truthy values.

Suggested change
"microbatch_model.sql": _microbatch_model_sql % "False", # `is_distributed` param
"microbatch_model.sql": _microbatch_model_sql % False, # `is_distributed` param

Copilot uses AI. Check for mistakes.

}

@pytest.fixture(scope="class")
def insert_two_rows_sql(self, project) -> str:
test_schema_relation = project.adapter.Relation.create(
database=project.database, schema=project.test_schema
)
return f"insert into {test_schema_relation}.input_model (id, event_time) values (4, '2020-01-04 00:00:00'), (5, '2020-01-05 00:00:00')"


class TestMicrobatchDistributedIncremental(BaseMicrobatch):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": _input_model_sql % "distributed_table",
"microbatch_model.sql": _microbatch_model_sql % "True", # `is_distributed` param
}

@pytest.fixture(scope="class")
Expand Down