diff --git a/dbt/adapters/clickhouse/__init__.py b/dbt/adapters/clickhouse/__init__.py index f466c55c..ec2a9eb3 100644 --- a/dbt/adapters/clickhouse/__init__.py +++ b/dbt/adapters/clickhouse/__init__.py @@ -1,4 +1,5 @@ from dbt.adapters.base import AdapterPlugin +from dbt.contracts.graph.nodes import BaseNode from dbt.adapters.clickhouse.column import ClickHouseColumn # noqa from dbt.adapters.clickhouse.connections import ClickHouseConnectionManager # noqa @@ -12,3 +13,20 @@ credentials=ClickHouseCredentials, include_path=clickhouse.PACKAGE_PATH, ) + + +def get_materialization(self): + """ + Aliases `materialized` config `incremental` or `table` in combination with `is_distributed` model config set to true + to `distributed_incremental` or `distributed_table` respectively. This is required for compatibility of dbt-core + microbatch functionalities with distributed models. + """ + materialized = self.config.materialized + is_distributed = self.config.extra.get('is_distributed') + if materialized in ('incremental', 'table') and is_distributed: + materialized = f"distributed_{materialized}" + return materialized + + +# patches a BaseNode method to allow setting `materialized` config overrides via dbt flags +BaseNode.get_materialization = get_materialization diff --git a/dbt/adapters/clickhouse/impl.py b/dbt/adapters/clickhouse/impl.py index 41cdbba1..0c31dff1 100644 --- a/dbt/adapters/clickhouse/impl.py +++ b/dbt/adapters/clickhouse/impl.py @@ -173,11 +173,15 @@ def can_exchange(self, schema: str, rel_type: str) -> bool: return ch_db and ch_db.engine in ('Atomic', 'Replicated') @available.parse_none - def should_on_cluster(self, materialized: str = '', engine: str = '') -> bool: + def should_on_cluster( + self, materialized: str = '', is_distributed: bool = False, engine: str = '' + ) -> bool: conn = self.connections.get_if_exists() if conn and conn.credentials.cluster: - return ClickHouseRelation.get_on_cluster(conn.credentials.cluster, materialized, engine) - return ClickHouseRelation.get_on_cluster('', materialized, engine) + return ClickHouseRelation.get_on_cluster( + conn.credentials.cluster, materialized, is_distributed, engine + ) + return ClickHouseRelation.get_on_cluster('', materialized, is_distributed, engine) @available.parse_none def calculate_incremental_strategy(self, strategy: str) -> str: diff --git a/dbt/adapters/clickhouse/relation.py b/dbt/adapters/clickhouse/relation.py index 1a52838e..a931ca83 100644 --- a/dbt/adapters/clickhouse/relation.py +++ b/dbt/adapters/clickhouse/relation.py @@ -92,12 +92,17 @@ def should_on_cluster(self) -> bool: @classmethod def get_on_cluster( - cls: Type[Self], cluster: str = '', materialized: str = '', engine: str = '' + cls: Type[Self], + cluster: str = '', + materialized: str = '', + is_distributed: bool = False, + engine: str = '', ) -> bool: if cluster.strip(): return ( materialized in ('view', 'dictionary') or 'distributed' in materialized + or is_distributed or 'Replicated' in engine ) @@ -142,8 +147,9 @@ def create_from( else: materialized = relation_config.config.get('materialized') or '' + is_distributed = relation_config.config.get('extra', {}).get('is_distributed') engine = relation_config.config.get('engine') or '' - can_on_cluster = cls.get_on_cluster(cluster, materialized, engine) + can_on_cluster = cls.get_on_cluster(cluster, materialized, is_distributed, engine) return cls.create( database='', diff --git a/dbt/include/clickhouse/macros/adapters/relation.sql b/dbt/include/clickhouse/macros/adapters/relation.sql index 59ce37ab..f481196d 100644 --- a/dbt/include/clickhouse/macros/adapters/relation.sql +++ b/dbt/include/clickhouse/macros/adapters/relation.sql @@ -5,7 +5,7 @@ {% endif %} {%- set can_exchange = adapter.can_exchange(schema, type) %} - {%- set should_on_cluster = adapter.should_on_cluster(config.get('materialized'), engine_clause()) %} + {%- set should_on_cluster = adapter.should_on_cluster(config.get('materialized'), config.get('is_distributed'), engine_clause()) %} {%- set new_relation = api.Relation.create( database=None, schema=schema, diff --git a/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql index 93cdcfcb..8209a2d7 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql @@ -54,6 +54,7 @@ {% call statement('main') %} {{ create_view_as(view_relation, sql) }} {% endcall %} + {% do to_drop.append(view_relation) %} {% if existing_relation_local is none %} -- No existing local table, recreate local and distributed tables @@ -95,6 +96,14 @@ {% set need_swap = true %} {% elif incremental_strategy == 'delete_insert' %} {% do clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates, True) %} + {% elif incremental_strategy == 'microbatch' %} + {%- if config.get("__dbt_internal_microbatch_event_time_start") -%} + {% do incremental_predicates.append(config.get("event_time") ~ " >= toDateTime('" ~ config.get("__dbt_internal_microbatch_event_time_start").strftime("%Y-%m-%d %H:%M:%S") ~ "')") %} + {%- endif -%} + {%- if model.config.__dbt_internal_microbatch_event_time_end -%} + {% do incremental_predicates.append(config.get("event_time") ~ " < toDateTime('" ~ config.get("__dbt_internal_microbatch_event_time_end").strftime("%Y-%m-%d %H:%M:%S") ~ "')") %} + {%- endif -%} + {% do clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates, True) %} {% elif incremental_strategy == 'insert_overwrite' %} {% do clickhouse__incremental_insert_overwrite(existing_relation, partition_by, True) %} {% elif incremental_strategy == 'append' %} @@ -150,6 +159,6 @@ {{ run_hooks(post_hooks, inside_transaction=False) }} - {{ return({'relations': [target_relation]}) }} + {{ return({'relations': [target_relation, target_relation_local]}) }} {%- endmaterialization %} \ No newline at end of file diff --git a/tests/integration/adapter/incremental/test_incremental_microbatch.py b/tests/integration/adapter/incremental/test_incremental_microbatch.py index 3e25e88b..9129aec6 100644 --- a/tests/integration/adapter/incremental/test_incremental_microbatch.py +++ b/tests/integration/adapter/incremental/test_incremental_microbatch.py @@ -4,11 +4,10 @@ _input_model_sql = """ {{ config( - materialized='table', + materialized='%s', event_time='event_time' ) }} - select 1 as id, toDateTime('2020-01-01 00:00:00') as event_time union all select 2 as id, toDateTime('2020-01-02 00:00:00') as event_time @@ -20,6 +19,7 @@ {{ config( materialized='incremental', + is_distributed=%s, incremental_strategy='microbatch', unique_key='id', event_time='event_time', @@ -27,7 +27,6 @@ begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0) ) }} - select * from {{ ref('input_model') }} """ @@ -36,8 +35,24 @@ class TestMicrobatchIncremental(BaseMicrobatch): @pytest.fixture(scope="class") def models(self): return { - "input_model.sql": _input_model_sql, - "microbatch_model.sql": _microbatch_model_sql, + "input_model.sql": _input_model_sql % "table", + "microbatch_model.sql": _microbatch_model_sql % "False", # `is_distributed` param + } + + @pytest.fixture(scope="class") + def insert_two_rows_sql(self, project) -> str: + test_schema_relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + return f"insert into {test_schema_relation}.input_model (id, event_time) values (4, '2020-01-04 00:00:00'), (5, '2020-01-05 00:00:00')" + + +class TestMicrobatchDistributedIncremental(BaseMicrobatch): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": _input_model_sql % "distributed_table", + "microbatch_model.sql": _microbatch_model_sql % "True", # `is_distributed` param } @pytest.fixture(scope="class")