Skip to content

Commit

Permalink
Feature/add union data (#44)
Browse files Browse the repository at this point in the history
* started

* test and docs

* Docs and sources

* missed window ffunction

* readme docs for unioning sources

* docs

* changelog

* update package versions

* joe feedback

* readme tweaks following hubspot

* update readme

* swap source definition template

* working

* fix source relation

* fix

* bk

* constant expression

* constant expression errors

* add source_relation to audit_log

* docs

* readme

* joe feedback

* source check

* Apply suggestions from code review

Co-authored-by: fivetran-catfritz <[email protected]>

---------

Co-authored-by: fivetran-catfritz <[email protected]>
  • Loading branch information
fivetran-jamie and fivetran-catfritz authored Dec 2, 2024
1 parent 8b9ca18 commit 94c1b56
Show file tree
Hide file tree
Showing 47 changed files with 663 additions and 118 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
# dbt_zendesk_source v0.14.0
[PR #44](https://github.com/fivetran/dbt_zendesk_source/pull/44) includes the following updates:

## Feature Update: Run Package on Unioned Connectors
- This release supports running the package on multiple Zendesk sources at once! See the [README](https://github.com/fivetran/dbt_zendesk_source?tab=readme-ov-file#step-3-define-database-and-schema-variables) for details on how to leverage this feature.

> Please note: This is a **Breaking Change** in that we have a added a new field, `source_relation`, that points to the source connector from which the record originated.
## Documentation
- Added missing documentation for staging model columns.

# dbt_zendesk_source v0.13.0
[PR #55](https://github.com/fivetran/dbt_zendesk_source/pull/55) includes the following updates:

Expand Down
71 changes: 66 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,85 @@ dispatch:
search_order: ['spark_utils', 'dbt_utils']
```
### Step 2: Install the package
### Step 2: Install the package (skip if using `zendesk` transformation package)
Include the following zendesk_source package version in your `packages.yml` file **only if you are NOT also installing the [Zendesk Support transformation package](https://github.com/fivetran/dbt_zendesk)**. The transform package has a dependency on this source package.
> TIP: Check [dbt Hub](https://hub.getdbt.com/) for the latest installation instructions or [read the dbt docs](https://docs.getdbt.com/docs/package-management) for more information on installing packages.
```yaml
packages:
- package: fivetran/zendesk_source
version: [">=0.13.0", "<0.14.0"]
version: [">=0.14.0", "<0.15.0"]
```
### Step 3: Define database and schema variables
#### Option A: Single connector
By default, this package runs using your target database and the `zendesk` schema. If this is not where your Zendesk Support data is (for example, if your zendesk schema is named `zendesk_fivetran`), add the following configuration to your root `dbt_project.yml` file:

```yml
vars:
zendesk_database: your_destination_name
zendesk_schema: your_schema_name
```
> **Note**: When running the package with a single source connector, the `source_relation` column in each model will be populated with an empty string.

#### Option B: Union multiple connectors
If you have multiple Zendesk connectors in Fivetran and would like to use this package on all of them simultaneously, we have provided functionality to do so. For each source table, the package will union all of the data together and pass the unioned table into the transformations. The `source_relation` column in each model indicates the origin of each record.

To use this functionality, you will need to set the `zendesk_sources` variable in your root `dbt_project.yml` file:

```yml
# dbt_project.yml
vars:
zendesk_sources:
- database: connector_1_destination_name # Required
schema: connector_1_schema_name # Rquired
name: connector_1_source_name # Required only if following the step in the following subsection
- database: connector_2_destination_name
schema: connector_2_schema_name
name: connector_2_source_name
```

##### Recommended: Incorporate unioned sources into DAG
> *If you are running the package through [Fivetran Transformations for dbt Core™](https://fivetran.com/docs/transformations/dbt#transformationsfordbtcore), the below step is necessary in order to synchronize model runs with your Zendesk connectors.*

By default, this package defines one single-connector source, called `zendesk`, which will be disabled if you are unioning multiple connectors. This means that your DAG will not include your Zendesk sources, though the package will run successfully.

To properly incorporate all of your Zendesk connectors into your project's DAG:
1. Define each of your sources in a `.yml` file in your project. Utilize the following template for the `source`-level configurations, and, **most importantly**, copy and paste the table and column-level definitions from the package's `src_zendesk.yml` [file](https://github.com/fivetran/dbt_zendesk_source/blob/main/models/src_zendesk.yml#L15-L351).

```yml
# a .yml file in your root project
sources:
- name: <name> # ex: Should match name in zendesk_sources
schema: <schema_name>
database: <database_name>
loader: fivetran
loaded_at_field: _fivetran_synced
freshness: # feel free to adjust to your liking
warn_after: {count: 72, period: hour}
error_after: {count: 168, period: hour}
tables: # copy and paste from models/src_zendesk.yml - see https://support.atlassian.com/bitbucket-cloud/docs/yaml-anchors/ for how to use anchors to only do so once
```

> **Note**: If there are source tables you do not have (see [Step 4](https://github.com/fivetran/dbt_zendesk_source?tab=readme-ov-file#step-4-disable-models-for-non-existent-sources)), still include them in this source definition.

2. Set the `has_defined_sources` variable (scoped to the `zendesk_source` package) to `True`, like such:
```yml
# dbt_project.yml
vars:
zendesk_source:
has_defined_sources: true
```

### Step 4: Enable/Disable models for non-existent sources
This package takes into consideration that not every Zendesk Support account utilizes the `schedule`, `schedule_holiday`, `ticket_schedule`, `daylight_time`, `time_zone`, `audit_log`, `domain_name`, `user_tag`, `organization_tag`, or `ticket_form_history` features, and allows you to disable the corresponding functionality. By default, all variables' values are assumed to be `true`, except for `using_schedule_histories`. Add variables for only the tables you want to enable/disable:
> _This step is optional if you are unioning multiple connectors together in the previous step. The `union_data` macro will create empty staging models for sources that are not found in any of your Zendesk schemas/databases. However, you can still leverage the below variables if you would like to avoid this behavior._

This package takes into consideration that not every Zendesk Support account utilizes the `schedule`, `schedule_holiday`, `ticket_schedule`, `daylight_time`, `time_zone`, `audit_log`, `domain_name`, `user_tag`, `organization_tag`, or `ticket_form_history` features, and allows you to disable the corresponding functionality.

By default, all variables' values are assumed to be `true`, except for `using_schedule_histories`. Add variables for only the tables you want to enable/disable:

```yml
vars:
using_schedule_histories: True #Enable if you are using audit_logs for schedule histories
Expand Down Expand Up @@ -117,8 +178,8 @@ models:
+schema: my_new_schema_name # leave blank for just the target_schema
```

#### Change the source table references
If an individual source table has a different name than the package expects, add the table name as it appears in your destination to the respective variable:
### Change the source table references (only if using a single connector)
If an individual source table has a different name than the package expects, add the table name as it appears in your destination to the respective variable. This is not available when running the package on multiple unioned connectors.
> IMPORTANT: See this project's [dbt_project.yml](https://github.com/fivetran/dbt_zendesk_source/blob/main/dbt_project.yml) variable declarations to see the expected names.

```yml
Expand Down
6 changes: 4 additions & 2 deletions dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
config-version: 2
name: 'zendesk_source'
version: '0.13.0'
version: '0.14.0'
require-dbt-version: [">=1.3.0", "<2.0.0"]

models:
zendesk_source:
materialized: table
Expand Down Expand Up @@ -32,4 +33,5 @@ vars:

zendesk__ticket_passthrough_columns: []
zendesk__user_passthrough_columns: []
zendesk__organization_passthrough_columns: []
zendesk__organization_passthrough_columns: []
zendesk_sources: []
2 changes: 1 addition & 1 deletion docs/catalog.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/manifest.json

Large diffs are not rendered by default.

67 changes: 67 additions & 0 deletions macros/union/union_zendesk_connections.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
{% macro union_zendesk_connections(connection_dictionary, single_source_name, single_table_name) %}

{{ adapter.dispatch('union_zendesk_connections', 'zendesk_source') (connection_dictionary, single_source_name, single_table_name) }}

{%- endmacro %}

{% macro default__union_zendesk_connections(connection_dictionary, single_source_name, single_table_name) %}

{% if connection_dictionary %}
{# For unioning #}
{%- set relations = [] -%}
{%- for connection in connection_dictionary -%}

{%- set relation=adapter.get_relation(
database=source(connection.name, single_table_name).database,
schema=source(connection.name, single_table_name).schema,
identifier=source(connection.name, single_table_name).identifier) if var('has_defined_sources', false)

else adapter.get_relation(
database=connection.database if connection.database else target.database,
schema=connection.schema if connection.schema else single_source_name,
identifier=single_table_name
)
-%}

{%- if relation is not none -%}
{%- do relations.append(relation) -%}
{%- endif -%}

{%- endfor -%}

{%- if relations != [] -%}
{{ zendesk_source.zendesk_union_relations(relations) }}
{%- else -%}
{% if execute and not var('fivetran__remove_empty_table_warnings', false) -%}
{{ exceptions.warn("\n\nPlease be aware: The " ~ single_source_name ~ "." ~ single_table_name ~ " table was not found in your schema(s). The Fivetran Data Model will create a completely empty staging model as to not break downstream transformations. To turn off these warnings, set the `fivetran__remove_empty_table_warnings` variable to TRUE (see https://github.com/fivetran/dbt_fivetran_utils/tree/releases/v0.4.latest#union_data-source for details).\n") }}
{% endif -%}
select
cast(null as {{ dbt.type_string() }}) as _dbt_source_relation
limit 0
{%- endif -%}

{% else %}
{# Not unioning #}
{%- set relation=adapter.get_relation(
database=source(single_source_name, single_table_name).database,
schema=source(single_source_name, single_table_name).schema,
identifier=source(single_source_name, single_table_name).identifier
) -%}

{%- if relation is not none -%}
select
{{ dbt_utils.star(from=source(single_source_name, single_table_name)) }}
from {{ source(single_source_name, single_table_name) }} as source_table

{% else %}
{% if execute and not var('fivetran__remove_empty_table_warnings', false) -%}
{{ exceptions.warn("\n\nPlease be aware: The " ~ single_source_name|upper ~ "." ~ single_table_name|upper ~ " table was not found in your schema(s). The Fivetran Data Model will create a completely empty staging model as to not break downstream transformations. To turn off these warnings, set the `fivetran__remove_empty_table_warnings` variable to TRUE (see https://github.com/fivetran/dbt_fivetran_utils/tree/releases/v0.4.latest#union_data-source for details).\n") }}
{% endif -%}

select
cast(null as {{ dbt.type_string() }}) as _dbt_source_relation
limit 0
{%- endif -%}
{% endif -%}

{%- endmacro %}
15 changes: 15 additions & 0 deletions macros/union/zendesk_source_relation.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{% macro apply_source_relation() -%}

{{ adapter.dispatch('apply_source_relation', 'zendesk_source') () }}

{%- endmacro %}

{% macro default__apply_source_relation() -%}

{% if var('zendesk_sources', []) != [] %}
, _dbt_source_relation as source_relation
{% else %}
, '{{ var("zendesk_database", target.database) }}' || '.'|| '{{ var("zendesk_schema", "zendesk") }}' as source_relation
{% endif %}

{%- endmacro %}
131 changes: 131 additions & 0 deletions macros/union/zendesk_union_relations.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
{# Adapted from dbt_utils.union_relations() #}

{%- macro zendesk_union_relations(relations, aliases=none, column_override=none, include=[], exclude=[], source_column_name='_dbt_source_relation', where=none) -%}
{{ return(adapter.dispatch('zendesk_union_relations', 'zendesk_source')(relations, aliases, column_override, include, exclude, source_column_name, where)) }}
{% endmacro %}

{%- macro default__zendesk_union_relations(relations, aliases=none, column_override=none, include=[], exclude=[], source_column_name='_dbt_source_relation', where=none) -%}

{%- if exclude and include -%}
{{ exceptions.raise_compiler_error("Both an exclude and include list were provided to the `union` macro. Only one is allowed") }}
{%- endif -%}

{#-- Prevent querying of db in parsing mode. This works because this macro does not create any new refs. -#}
{%- if not execute %}
{{ return('') }}
{% endif -%}

{%- set column_override = column_override if column_override is not none else {} -%}

{%- set relation_columns = {} -%}
{%- set column_superset = {} -%}
{%- set all_excludes = [] -%}
{%- set all_includes = [] -%}

{%- if exclude -%}
{%- for exc in exclude -%}
{%- do all_excludes.append(exc | lower) -%}
{%- endfor -%}
{%- endif -%}

{%- if include -%}
{%- for inc in include -%}
{%- do all_includes.append(inc | lower) -%}
{%- endfor -%}
{%- endif -%}

{%- for relation in relations -%}

{%- do relation_columns.update({relation: []}) -%}

{%- do dbt_utils._is_relation(relation, 'zendesk_union_relations') -%}
{%- do dbt_utils._is_ephemeral(relation, 'zendesk_union_relations') -%}
{%- set cols = adapter.get_columns_in_relation(relation) -%}
{%- for col in cols -%}

{#- If an exclude list was provided and the column is in the list, do nothing -#}
{%- if exclude and col.column | lower in all_excludes -%}

{#- If an include list was provided and the column is not in the list, do nothing -#}
{%- elif include and col.column | lower not in all_includes -%}

{#- Otherwise add the column to the column superset -#}
{%- else -%}

{#- update the list of columns in this relation -#}
{%- do relation_columns[relation].append(col.column) -%}

{%- if col.column in column_superset -%}

{%- set stored = column_superset[col.column] -%}
{%- if col.is_string() and stored.is_string() and col.string_size() > stored.string_size() -%}

{%- do column_superset.update({col.column: col}) -%}

{%- endif %}

{%- else -%}

{%- do column_superset.update({col.column: col}) -%}

{%- endif -%}

{%- endif -%}

{%- endfor -%}
{%- endfor -%}

{%- set ordered_column_names = column_superset.keys() -%}
{%- set dbt_command = flags.WHICH -%}


{% if dbt_command in ['run', 'build'] %}
{% if (include | length > 0 or exclude | length > 0) and not column_superset.keys() %}
{%- set relations_string -%}
{%- for relation in relations -%}
{{ relation.name }}
{%- if not loop.last %}, {% endif -%}
{%- endfor -%}
{%- endset -%}

{%- set error_message -%}
There were no columns found to union for relations {{ relations_string }}
{%- endset -%}

{{ exceptions.raise_compiler_error(error_message) }}
{%- endif -%}
{%- endif -%}

{%- for relation in relations %}

(
select

{%- if source_column_name is not none %}
cast({{ dbt.string_literal(relation.database ~ '.' ~ relation.schema) }} as {{ dbt.type_string() }}) as {{ source_column_name }},
{%- endif %}

{% for col_name in ordered_column_names -%}

{%- set col = column_superset[col_name] %}
{%- set col_type = column_override.get(col.column, col.data_type) %}
{%- set col_name = adapter.quote(col_name) if col_name in relation_columns[relation] else 'null' %}
cast({{ col_name }} as {{ col_type }}) as {{ col.quoted }} {% if not loop.last %},{% endif -%}

{%- endfor %}

{# This alias is the only addition made to thr dbt_utils.union_relations() code. Avoids errors if the table is named a reserved keyword #}
from {{ aliases[loop.index0] if aliases else relation }} as unioned_relation_{{ loop.index }}

{% if where -%}
where {{ where }}
{%- endif %}
)

{% if not loop.last -%}
union all
{% endif -%}

{%- endfor -%}

{%- endmacro -%}
Loading

0 comments on commit 94c1b56

Please sign in to comment.