Skip to content

Commit cc63d02

Browse files
Joshua CookJoshua Cook
authored andcommitted
move src code to package pipelines
1 parent bd03c91 commit cc63d02

File tree

9 files changed

+93
-14
lines changed

9 files changed

+93
-14
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,6 @@
66
**/.local
77
**/.pytest_cache
88
**/__pycache__
9+
**/tests/data/bronze
10+
**/tests/data/silver
11+
**/tests/data/checkpoints

env/docker/base/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
FROM jupyter/pyspark-notebook
22
COPY requirements.txt .
33
RUN pip install -r requirements.txt
4+
ENV PYTHONPATH ${PYTHONPATH}:/home/jovyan/src
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
FROM jupyter/pyspark-notebook
22
COPY requirements.txt .
33
RUN pip install -r requirements.txt
4+
ENV PYTHONPATH ${PYTHONPATH}:/home/jovyan/src

example/databricks_job_with_wheel/tests/spark/test_dataframe_operations.py

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
import os
22
import pytest
3+
from shutil import rmtree
34
from pyspark.sql import DataFrame, SparkSession
45
from pyspark.sql.types import StructType
56

6-
from src.config import generate_paths, generate_schemas
7-
from src.operations import create_stream_writer, transform_bronze, transform_raw
8-
from src.utility import generate_spark_session, read_stream_json
7+
from pipelines.config import generate_paths, generate_schemas
8+
from pipelines.operations import create_stream_writer, transform_bronze, transform_raw
9+
from pipelines.utility import generate_spark_session, load_delta_table, read_stream_json
910

1011

1112
@pytest.fixture(scope="module")
@@ -20,17 +21,23 @@ def spark() -> SparkSession:
2021

2122
@pytest.fixture(scope="module")
2223
def raw_schema() -> StructType:
23-
yield generate_schemas("RAW_SCHEMA")
24+
yield generate_schemas("raw_schema")
25+
26+
27+
@pytest.fixture(scope="module")
28+
def bronze_path(env: str) -> StructType:
29+
path = generate_paths(env, "bronze")
30+
yield path
2431

2532

2633
@pytest.fixture(scope="module")
2734
def bronze_schema() -> StructType:
28-
yield generate_schemas("BRONZE_SCHEMA")
35+
yield generate_schemas("bronze_schema")
2936

3037

3138
@pytest.fixture(scope="module")
3239
def silver_schema() -> StructType:
33-
yield generate_schemas("SILVER_SCHEMA")
40+
yield generate_schemas("silver_schema")
3441

3542

3643
@pytest.fixture()
@@ -45,12 +52,28 @@ def bronze_df(spark: SparkSession, env: str, bronze_schema: StructType) -> DataF
4552
yield read_stream_json(spark, bronze_path, bronze_schema)
4653

4754

55+
@pytest.fixture()
56+
def silver_df(spark: SparkSession, env: str, silver_schema: StructType) -> DataFrame:
57+
test_silver_path = generate_paths(env, "test_silver")
58+
silver_path = generate_paths(env, "silver")
59+
silver_json_df = read_stream_json(spark, test_silver_path, silver_schema)
60+
(
61+
silver_json_df.writeStream.format("delta")
62+
.partitionBy("p_eventdate")
63+
.option("path", silver_path)
64+
.save()
65+
)
66+
yield load_delta_table(spark, silver_path)
67+
rmtree(silver_path)
68+
69+
4870
class TestSparkDataframeOperations:
49-
def test_create_stream_write(self, env, spark, raw_df, bronze_schema):
71+
def test_create_stream_write(self, env, spark, raw_df, bronze_path, bronze_schema):
5072
transformed_raw_df = transform_raw(spark, raw_df)
5173
bronze_checkpoint = generate_paths(env, "bronze_checkpoint")
5274
raw_to_bronze_writer = create_stream_writer(
5375
dataframe=transformed_raw_df,
76+
path=bronze_path,
5477
checkpoint=bronze_checkpoint,
5578
name="write_raw_to_bronze",
5679
partition_column="p_ingestdate",
@@ -64,3 +87,6 @@ def test_transform_raw(self, spark, raw_df, bronze_schema):
6487
def test_transform_bronze(self, spark, bronze_df, silver_schema):
6588
transformed_bronze_df = transform_bronze(spark, bronze_df)
6689
assert transformed_bronze_df.schema == silver_schema
90+
91+
def test_prepare_interpolation_dataframe(self, spark, silver_df):
92+
pass

example/databricks_job_with_wheel/tests/spark/test_integrations.py

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44
from pyspark.sql import DataFrame, SparkSession
55
from pyspark.sql.types import StructType
66

7-
from src.config import generate_paths, generate_schemas
8-
from src.operations import create_stream_writer, transform_raw
9-
from src.utility import (
7+
from pipelines.config import generate_paths, generate_schemas
8+
from pipelines.operations import create_stream_writer, transform_raw, transform_bronze
9+
from pipelines.utility import (
1010
generate_spark_session,
1111
load_delta_table,
1212
read_stream_json,
@@ -24,6 +24,12 @@ def spark() -> SparkSession:
2424
yield generate_spark_session()
2525

2626

27+
@pytest.fixture()
28+
def bronze_df(spark: SparkSession, env: str, bronze_schema: StructType) -> DataFrame:
29+
bronze_path = generate_paths(env, "test_bronze")
30+
yield read_stream_json(spark, bronze_path, bronze_schema)
31+
32+
2733
@pytest.fixture(scope="module")
2834
def bronze_checkpoint(env: str) -> StructType:
2935
path = generate_paths(env, "bronze_checkpoint")
@@ -40,13 +46,37 @@ def bronze_path(env: str) -> StructType:
4046

4147
@pytest.fixture(scope="module")
4248
def bronze_schema() -> StructType:
43-
yield generate_schemas("BRONZE_SCHEMA")
49+
yield generate_schemas("bronze_schema")
50+
51+
52+
@pytest.fixture(scope="module")
53+
def raw_schema() -> StructType:
54+
yield generate_schemas("raw_schema")
4455

4556

4657
@pytest.fixture()
47-
def raw_df(spark: SparkSession, env: str) -> DataFrame:
58+
def raw_df(spark: SparkSession, env: str, raw_schema: StructType) -> DataFrame:
4859
raw_path = generate_paths(env, "test_raw")
49-
yield read_stream_json(spark, raw_path)
60+
yield read_stream_json(spark, raw_path, raw_schema)
61+
62+
63+
@pytest.fixture(scope="module")
64+
def silver_checkpoint(env: str) -> StructType:
65+
path = generate_paths(env, "silver_checkpoint")
66+
yield path
67+
rmtree(path)
68+
69+
70+
@pytest.fixture(scope="module")
71+
def silver_path(env: str) -> StructType:
72+
path = generate_paths(env, "silver")
73+
yield path
74+
rmtree(path)
75+
76+
77+
@pytest.fixture(scope="module")
78+
def silver_schema() -> StructType:
79+
yield generate_schemas("silver_schema")
5080

5181

5282
class TestSparkIntegrations:
@@ -57,11 +87,29 @@ def test_raw_to_bronze(
5787
transformed_raw_df = transform_raw(spark, raw_df)
5888
raw_to_bronze_writer = create_stream_writer(
5989
dataframe=transformed_raw_df,
90+
path=bronze_path,
6091
checkpoint=bronze_checkpoint,
6192
name=stream_name,
6293
partition_column="p_ingestdate",
6394
)
64-
raw_to_bronze_writer.start(bronze_path)
95+
raw_to_bronze_writer.start()
6596

6697
until_stream_is_ready(spark, stream_name)
6798
assert load_delta_table(spark, bronze_path).count() == 7320
99+
100+
def test_bronze_to_silver(
101+
self, env, spark, bronze_df, silver_checkpoint, silver_path, silver_schema
102+
):
103+
stream_name = "write_bronze_to_silver"
104+
transformed_bronze_df = transform_bronze(spark, bronze_df)
105+
bronze_to_silver_writer = create_stream_writer(
106+
dataframe=transformed_bronze_df,
107+
path=silver_path,
108+
checkpoint=silver_checkpoint,
109+
name=stream_name,
110+
partition_column="p_eventdate",
111+
)
112+
bronze_to_silver_writer.start()
113+
114+
until_stream_is_ready(spark, stream_name)
115+
assert load_delta_table(spark, silver_path).count() == 7320

0 commit comments

Comments
 (0)