Skip to content
This repository was archived by the owner on Apr 15, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
831a7e6
Add initial medallion-s3 pipeline example - wip
laguirre-cs Mar 25, 2024
5adf1ad
Add metadata.json and minor change to config file
laguirre-cs Mar 25, 2024
6fe8340
Rename folder from medallion-s3 to medallion
laguirre-cs Mar 25, 2024
158653f
Add base README and try disable examples via the metadata.json file
laguirre-cs Mar 27, 2024
7edc936
Add resourceType as a property to the metadata.json file
laguirre-cs Mar 27, 2024
5d0dcc6
Add Skill resourceType to templates for publish existing examples
laguirre-cs Mar 27, 2024
add6c89
Remove pipeline template
laguirre-cs Mar 27, 2024
c2b55cb
Add back template
laguirre-cs Mar 27, 2024
e8ff3d7
Add warning to README
laguirre-cs Mar 27, 2024
efb10a3
Remove template - trying to fix case issue
laguirre-cs Mar 27, 2024
6b4c817
Add back pipeline template
laguirre-cs Mar 27, 2024
ff8c98d
Remove folder
laguirre-cs Mar 28, 2024
5538782
Add back renamed folder
laguirre-cs Mar 28, 2024
f0d8b67
Update README
laguirre-cs Mar 28, 2024
ae5d866
Move README to be alongside pipeline code, and rename block modules
laguirre-cs Mar 28, 2024
74debdc
Rename Blocks to not have Test in name and reformat code slightly
laguirre-cs Apr 2, 2024
245a17a
Update block input/output names, remove any references to Mongo, and …
laguirre-cs Apr 5, 2024
112b7bf
Add seed data for local usage
laguirre-cs Apr 5, 2024
6aed000
Remove references to mtn within dbt project folder
laguirre-cs Apr 5, 2024
9aef248
Rename folder dbt project folder name - will match pipeline name
laguirre-cs Apr 5, 2024
82b748c
Fix variable name usage in specs
laguirre-cs Apr 5, 2024
af9ff50
Add default config files for running template as a Skill
laguirre-cs Apr 5, 2024
8f8e77b
Fix SDK version in default spark config for local usage
laguirre-cs Apr 8, 2024
d766417
Commit working version of Pipeline - use S3Filestream instead of Kafk…
laguirre-cs Apr 9, 2024
e7d8581
Add ability to use mongo for gold ODS
laguirre-cs Apr 9, 2024
6be782d
Remove dbt packages - can be installed after the template is created
laguirre-cs Apr 9, 2024
30b3380
Add quotes around string
laguirre-cs Apr 9, 2024
62962f7
Remove double bracket that was causing dbt_project.yml to not be temp…
laguirre-cs Apr 9, 2024
e4d35a2
Add local option (non-streaming) option for bronze block
laguirre-cs Apr 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Cortex Code Templates

This repository contains templates for resources (Skills, Agents, Pipelines, etc.) in the Cortex Fabric platform. These
templates are designed to work with the `workspaces` feature of the [Cortex CLI](https://github.com/CognitiveScale/cortex-cli).

Templates are identified by the presence of a `metadata.json` within the folder.

```json
{
"name": "<template-name>",
"title": "<template-title>",
"description": "<template-description>",
"tags": ["list", "of", "tags", "applied", "to", "template"],
"enabled": true,
"resourceType": "Skill"
}
```

The `resourceType` can be one of: `["Skill", "Pipeline"]`.
* Skill templates will have access to the `{{skillname}}` variable in text files, where as `__skillName__` can be used
in file paths
* Pipeline templates will have access to the `{{pipelinename}}` variable in text files, where as `__pipelineName__` can
be used in file paths

:warning: **WARNING: The filename must be exactly `metadata.json`, any other variations will be ignored.**
8 changes: 8 additions & 0 deletions pipelineTemplates/medallion/metadata.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"name": "medallion",
"title": "Medallion Pipeline Template",
"description": "A Pipeline template that implements a medallion architecture",
"tags": ["template", "pipelines", "medallion"],
"enabled": true,
"resourceType": "Pipeline"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from pyspark.sql.streaming import StreamingQuery
from sensa_data_pipelines.executors.pyspark.mixins.profiles_sdk_mixins import (
ProfilesSdkMixin,
DataEndpointProfilesSdkMixin,
)
from sensa_data_pipelines.pipeline_model import (
SensaDataModelConfig,
SensaConnectionConfig,
)
from sensa_data_pipelines.executors.pyspark.streaming import StreamingBlock

INPUT_NAME = "bronze_input"
OUTPUT_NAME = "to_bronze"
USE_STREAMING = False


class BronzeBlock(StreamingBlock, ProfilesSdkMixin, DataEndpointProfilesSdkMixin):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)

def execute_stream(self, **kwargs) -> StreamingQuery:
connection = self.get_input_config(INPUT_NAME, SensaConnectionConfig)
bronze_model = self.get_output_config(OUTPUT_NAME, SensaDataModelConfig)
if USE_STREAMING:
# NOTE: This requires streaming input (S3FileStream, GCPFileStream, KafkaStream, etc.).
# Need to include instructions for this, or add the ability to use the local-csv file.
stream_pair = (
self.sensa.readStream().readConnection(connection.endpoint.project, connection.endpoint.name).load()
)
# write the starting dataframe, then write the streaming data
static_df = stream_pair.getStaticDf()
(
static_df.write.format("delta")
.mode("overwrite")
.option("checkpointLocation", bronze_model.paths.checkpoint_path)
.save(bronze_model.paths.location)
)
return (
stream_pair.getStreamDf()
.writeStream.format("delta")
.queryName(bronze_model.paths.segment)
.outputMode("append")
.option("checkpointLocation", bronze_model.paths.checkpoint_path)
.trigger(processingTime="5 seconds")
.start(bronze_model.paths.location)
)
else:
# Use a non-streaming data source (DeltaTable) and write Bronze data
static_df = self.sensa.read().connection(connection.endpoint.project, connection.endpoint.name).load()
(
static_df.write.format("delta")
.mode("append")
.option("checkpointLocation", bronze_model.paths.checkpoint_path)
.save(bronze_model.paths.location)
)
return "done"
111 changes: 111 additions & 0 deletions pipelineTemplates/medallion/pipelines/__pipelinename__/blocks/gold.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import os
from pyspark.sql.streaming import StreamingQuery

from sensa_data_pipelines.executors.pyspark.mixins.profiles_sdk_mixins import (
ProfilesSdkMixin,
DataEndpointProfilesSdkMixin,
)
from sensa_data_pipelines.executors.pyspark.mixins.pyspark_mixins import (
PySparkMixin,
)
from sensa_data_pipelines.pipeline_model import (
SensaDataModelConfig,
SensaProfileSchemaConfig,
)
from sensa_data_pipelines.executors.pyspark.streaming import StreamingBlock

INPUT_NAME = "from_profile_schema"
OUTPUT_NAME = "to_gold_checkpoint"

# Collection of fields to aggregate as their own DeltaTable
sub_tables = [
"contactMedium",
"customer",
"customer360",
"individual",
"loyalty",
"partyInteraction",
"resource",
"usage",
]


class GoldBlock(StreamingBlock, PySparkMixin, ProfilesSdkMixin, DataEndpointProfilesSdkMixin):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)

def write_delta(self, batch, epoch):
"""Writes the given batch to separate Delta tables."""
batch.persist()
format_ = "delta"
gold_destination = self.ods_conn.paths.location
gold_checkpoint = self.ods_conn.paths.checkpoint_path
# Collect dedicated tables
for f in sub_tables:
selection = f"{f}.*" # using wildcard
# Resolve the paths to the destination & checkpoint files for the subtable by appending
# the subtable name
destination = os.path.dirname(gold_checkpoint) + "-" + f
checkpoint = os.path.join(destination, os.path.basename(gold_checkpoint))
batch.select(selection).write.format(format_).mode("append").option(
"checkpointLocation", checkpoint
).option("delta.enableChangeDataFeed", "false").save(destination)
self.logger.info(
"Appended batch '%s' to table '%s' to '%s' (checkpoint: %s)",
epoch,
selection,
destination,
checkpoint,
)
# Write the entire batch to its own DeltaTable ("gold")
(
batch.write.format(format_)
.mode("append")
.option("checkpointLocation", gold_checkpoint)
.option("delta.enableChangeDataFeed", "false")
.save(gold_destination)
)
self.logger.info(
"Appended batch '%s' to aggregate DeltTable '%s' (checkpoint: %s)",
epoch,
gold_destination,
gold_checkpoint,
)
batch.unpersist()

def write_mongo(self, batch, epoch):
"""Writes the given batch to separate tables in MongoDB"""
# Write to "external" Mongo (i.e. not a Cortex Connection)
batch.persist()
format_ = "mongo"
mongo_base_uri = "mongodb://localhost:27017/gold"
# Collect dedicated tables
for f in sub_tables:
selection = f"{f}.*" # using wildcard
table_uri = f"{mongo_base_uri}.f"
batch.select(selection).write.format(format_).mode("append").option(
"spark.mongodb.output.uri", table_uri
).save()
self.logger.info("Wrote subset of batch '%s' to table '%s'", epoch, f)
# Write the entire batch to its own table (All)
batch.write.format(format_).mode("append").option("spark.mongodb.output.uri", f"{mongo_base_uri}.All").save()
self.logger.info("Wrote batch '%s' to table All", epoch)
batch.unpersist()

def execute_stream(self, **kwargs) -> StreamingQuery:
profile_schema = self.get_input_config(INPUT_NAME, SensaProfileSchemaConfig)
readStream = (
self.spark.readStream.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.load(profile_schema.paths.location)
)
# Store a reference to the Output as a property of self (make it usable across methods)
self.ods_conn = self.get_output_config(OUTPUT_NAME, SensaDataModelConfig)
return (
readStream.writeStream.queryName(self.ods_conn.paths.segment)
.option("checkpointLocation", self.ods_conn.paths.checkpoint_path)
.outputMode("append")
.foreachBatch(self.write_delta)
.start()
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from sensa_data_pipelines.executors.pyspark.mixins.profiles_sdk_mixins import (
ProfilesSdkMixin,
DataEndpointProfilesSdkMixin,
OpenMetadataProfilesSdkMixin,
)
from sensa_data_pipelines.executors.pyspark.mixins.pyspark_mixins import (
PySparkMixin,
DbtPySparkMixin,
)
from sensa_data_pipelines.integrations.open_metadata.open_metadata_mixin import (
OpenMetadataJobConfig,
OpenMetadataJobTypes,
)
from sensa_data_pipelines.pipeline_model import (
SensaDataModelConfig,
SensaDataSourceConfig,
SensaProfileSchemaConfig,
)
from sensa_data_pipelines.executors.pyspark.streaming import StreamingBlock

INPUT_NAME = "from_bronze"
OUTPUT_NAME = "to_data_source"


class SilverBlock(
StreamingBlock,
DbtPySparkMixin,
PySparkMixin,
ProfilesSdkMixin,
DataEndpointProfilesSdkMixin,
OpenMetadataProfilesSdkMixin,
):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)

def execute(self, **kwargs) -> str:
bronze_model = self.get_input_config(INPUT_NAME, SensaDataModelConfig)
data_source = self.get_output_config(OUTPUT_NAME, SensaDataSourceConfig)
profile_schema = self.get_output_config("to_profile_schema", SensaProfileSchemaConfig)
dbt_vars = dict(
{
"bronze-location": bronze_model.paths.location,
"path-silver": data_source.paths.location_root,
"alias-silver": data_source.paths.segment,
"path-gold": profile_schema.paths.location_root,
"alias-gold": profile_schema.paths.segment,
}
)
run_results = self.run_dbt(dbt_vars=dbt_vars)
self.logger.info("Completed dbt Job")
if not run_results.success:
self.logger.error("DBT run failed! Results have been uploaded to Managed Content!")

# TODO: Need to make this conditional based on local vs remote context
should_run_om_job = self.variables.get("RUN_OPEN_METADATA_JOB") == "True"
if should_run_om_job:
self.logger.info("Running OpenMetadata Job")
self.run_open_metadata_job(
[
OpenMetadataJobConfig(OpenMetadataJobTypes.DELTA_LAKE_INGEST, None),
OpenMetadataJobConfig(OpenMetadataJobTypes.DELTA_LAKE_PROFILE, None),
OpenMetadataJobConfig(OpenMetadataJobTypes.DBT, None),
]
)
else:
self.logger.info("Skipping OpenMetadata Job. Set RUN_OPEN_METDATA_JOB to 'True' to change this behavior.")
return "ran"
103 changes: 103 additions & 0 deletions pipelineTemplates/medallion/pipelines/__pipelinename__/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
apiVersion: v3
kind: PipelineConfig
metadata:
title: "{{pipelinename}} - Medallion Pipeline Template"
name: {{pipelinename}}
spec:
# Global Variables - defaults for all profiles
variables:
- name: OPEN_METADATA_REST_API
value: "http://openmetadata.cortex.svc.cluster.local:8585/api" # cluster internal API
- name: DBT_EXECUTION_STRING
value: build
profiles:
# Local Profile - for local usage. It uses a JDBC connection in the local catalog and sets an arbitrary value
# for the OpenMetadata token because it is stubbed during tests as the service is not available
- name: local
variables:
- name: PROJECT
value: test-project
- name: INPUT_CONNECTION_NAME
value: local-csv
- name: DATA_SOURCE_NAME
value: test-datasource
- name: PROFILE_NAME
value: test-profileschema
- name: OPEN_METADATA_TOKEN
value: 'xxxxx'
- name: RUN_OPEN_METADATA_JOB
value: "False"
# Default profile - for use in a Sensa cluster (use at your own risk).
- name: default
variables:
- name: OPEN_METADATA_TOKEN
value: '<REPLACE_ME>'
- name: INPUT_CONNECTION_NAME
value: '<REPLACE_ME>'
- name: RUN_OPEN_METADATA_JOB
value: "True"
blocks:
- name: bronze
title: Bronze
class: BronzeBlock
package: bronze
executor: pyspark
type: streaming
inputs:
- name: bronze_input
sensa:
connection:
project: ${PROJECT}
name: ${INPUT_CONNECTION_NAME}
outputs:
- name: to_bronze
sensa:
model:
project: ${PROJECT}
name: bronze
- name: dbt_silver_gold
# TODO: why do this run both Silver & GOLD - seems a little counterintuitive
title: DBT Silver And Gold
class: SilverBlock
package: silver
executor: pyspark
type: dbt
dbt:
projectDir: dbt/{{pipelinename}}
profileDir: dbt
runCommand: ${DBT_EXECUTION_STRING}
inputs:
- name: from_bronze
sensa:
model:
project: ${PROJECT}
name: bronze
outputs:
- name: to_data_source
sensa:
dataSource:
project: ${PROJECT}
name: ${DATA_SOURCE_NAME}
- name: to_profile_schema
sensa:
profile:
project: ${PROJECT}
name: ${PROFILE_NAME}
- name: gold
title: Gold
class: GoldBlock
package: gold
executor: pyspark
type: streaming
inputs:
- name: from_profile_schema
sensa:
profile:
project: ${PROJECT}
name: ${PROFILE_NAME}
outputs:
- name: to_gold_checkpoint
sensa:
model:
project: ${PROJECT}
name: gold
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
id: 0d2c6e37-16c9-4116-92ce-08e1e08e30d9
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
Welcome to your new dbt project!

### Using the starter project

Try running the following commands:
- dbt run
- dbt test


### Resources:
- Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction)
- Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers
- Join the [chat](https://community.getdbt.com/) on Slack for live discussions and support
- Find [dbt events](https://events.getdbt.com) near you
- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices
Loading