Skip to content

Commit

Permalink
Add some data validation examples (#112)
Browse files Browse the repository at this point in the history
Add some data validation examples including:

pandera
Nike's spark-expectations
basic WAP
Target's data-validator

Consisting of:

* Add pandera req

* Start adding a pandera example

* Formatting

* Format

* Formatting

* Add an example for the target data validator.

* Flesh out the target example and add it to CI.

* Start working on adding spark expectations from the Nike folks.

* Update rule

* Play around with spark expectations

* Update the sample rule

* Fix mismatched scala versions

* Install IcebergSparkSessionExtensions

* Start adding a pure SQL WAP example.

* Switch to using Session over legacy context.

* Format python ex

* Style cleanup

* Hmmm clone dv as well

* Comment out not yet working FF in SQL

* Fix examples

* Skip CI on target data validator for now (nested build issue and it should go away once PR is merged anyways).

* SparkSession imports are good.
  • Loading branch information
holdenk authored Sep 21, 2023
1 parent 5ff04dc commit 4e109c4
Show file tree
Hide file tree
Showing 17 changed files with 350 additions and 21 deletions.
33 changes: 33 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,36 @@ jobs:
spark*.tgz
iceberg*.jar
key: spark-artifacts
- name: Cache Data
uses: actions/cache@v3
with:
path: |
data/fetched/*
key: data-fetched
- name: Run sql examples
run:
./run_sql_examples.sh
run-target-examples:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Cache Spark and friends
uses: actions/cache@v3
with:
path: |
spark*.tgz
iceberg*.jar
key: spark-artifacts
- name: Cache Data
uses: actions/cache@v3
with:
path: |
data/fetched/*
key: data-fetched
- name: Run the target validator example
run:
cd target-validator; ./runme.sh
run-pyspark-examples:
runs-on: ubuntu-latest
steps:
Expand All @@ -70,6 +97,12 @@ jobs:
spark*.tgz
iceberg*.jar
key: spark-artifacts
- name: Cache Data
uses: actions/cache@v3
with:
path: |
data/fetched/*
key: data-fetched
- name: Run PySpark examples
run:
./run_pyspark_examples.sh
Expand Down
7 changes: 6 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,9 @@ metastore_db/

# Misc internal stuff
sql/*.sql.out
python/examples/*.py.out
python/examples/*.py.out
data/fetched/*

# more python
pyspark_venv.tar.gz
pyspark_venv/
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import scala.util.Random

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.sql.types._

import com.highperformancespark.examples.dataframe.HappyPandas.PandaInfo
Expand Down Expand Up @@ -68,15 +68,15 @@ class HappyPandasTest extends AnyFunSuite with DataFrameSuiteBase {
val expectedDf = createDF(expectedList, ("place", StringType),
("percentHappy", DoubleType))

val inputDF = sqlContext.createDataFrame(pandaInfoList)
val inputDF = spark.createDataFrame(pandaInfoList)
val resultDF = HappyPandas.happyPandasPercentage(inputDF)

assertDataFrameApproximateEquals(expectedDf, resultDF, 1E-5)
}
//end::approxEqualDataFrames[]

test("verify approx by hand") {
val inputDF = sqlContext.createDataFrame(pandaInfoList)
val inputDF = spark.createDataFrame(pandaInfoList)
val resultDF = HappyPandas.happyPandasPercentage(inputDF)
val resultRows = resultDF.collect()

Expand All @@ -94,7 +94,7 @@ class HappyPandasTest extends AnyFunSuite with DataFrameSuiteBase {
}

test("test encode Panda type") {
val inputDF = sqlContext.createDataFrame(rawPandaList)
val inputDF = spark.createDataFrame(rawPandaList)
val resultDF = HappyPandas.encodePandaType(inputDF)

val expectedRows = List(Row(10L, 0), Row(11L, 1))
Expand All @@ -107,7 +107,7 @@ class HappyPandasTest extends AnyFunSuite with DataFrameSuiteBase {
//tag::exactEqualDataFrames[]
test("verify exact equality") {
// test minHappyPandas
val inputDF = sqlContext.createDataFrame(pandaInfoList)
val inputDF = spark.createDataFrame(pandaInfoList)
val result = HappyPandas.minHappyPandas(inputDF, 2)
val resultRows = result.collect()

Expand All @@ -117,12 +117,12 @@ class HappyPandasTest extends AnyFunSuite with DataFrameSuiteBase {
//end::exactEqualDataFrames[]

test("test happyPandasPlaces") {
val inputDF = sqlContext.createDataFrame(pandaInfoList)
val inputDF = spark.createDataFrame(pandaInfoList)
val resultDF = HappyPandas.happyPandasPlaces(inputDF)

val expectedRows = List(PandaInfo(toronto, "giant", 1, 2),
PandaInfo(sandiego, "red", 2, 3))
val expectedDF = sqlContext.createDataFrame(expectedRows)
val expectedDF = spark.createDataFrame(expectedRows)

assertDataFrameEquals(expectedDF, resultDF)
}
Expand Down
5 changes: 5 additions & 0 deletions data/project.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
creator,projectname,stars
holdenk,spark-upgrade,17
krisnova,rust-nova,71
kbendick,MongoMart,6
mateiz,spark,36600
12 changes: 10 additions & 2 deletions env_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# Download Spark and iceberg if not present
SPARK_MAJOR="3.4"
SPARK_VERSION=3.4.1
SCALA_VERSION="2.12"
HADOOP_VERSION="3"
SPARK_PATH="spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}"
SPARK_FILE="spark-${SPARK_VERSION}-bin-hadoop3.tgz"
Expand All @@ -12,15 +13,18 @@ if [ ! -f "${SPARK_FILE}" ]; then
wget "https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/${SPARK_FILE}" &
fi
# Download Icberg if not present
ICEBERG_FILE="iceberg-spark-runtime-${SPARK_MAJOR}_2.13-${ICEBERG_VERSION}.jar"
ICEBERG_FILE="iceberg-spark-runtime-${SPARK_MAJOR}_${SCALA_VERSION}-${ICEBERG_VERSION}.jar"
if [ ! -f "${ICEBERG_FILE}" ]; then
wget "https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark-runtime-${SPARK_MAJOR}_2.13/${ICEBERG_VERSION}/${ICEBERG_FILE}" -O "${ICEBERG_FILE}" &
wget "https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark-runtime-${SPARK_MAJOR}_${SCALA_VERSION}/${ICEBERG_VERSION}/${ICEBERG_FILE}" -O "${ICEBERG_FILE}" &
fi
wait
# Setup the env
if [ ! -d "${SPARK_PATH}" ]; then
tar -xf ${SPARK_FILE}
fi

export SPARK_HOME="${SPARK_PATH}"

if [ ! -f "${SPARK_PATH}/jars/${ICEBERG_FILE}" ]; then
cp "${ICEBERG_FILE}" "${SPARK_PATH}/jars/${ICEBERG_FILE}"
fi
Expand All @@ -31,3 +35,7 @@ export PATH=${SPARK_PATH}:${SPARK_PATH}/python:${SPARK_PATH}/bin:${SPARK_PATH}/s
# Make sure we have a history directory
mkdir -p /tmp/spark-events

mkdir -p ./data/fetched/
if [ ! -f ./data/fetched/2021 ]; then
wget "https://gender-pay-gap.service.gov.uk/viewing/download-data/2021" -O ./data/fetched/2021
fi
4 changes: 2 additions & 2 deletions python/examples/bad_pyspark.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ def _setupTest():
def _test():
"""
Run the tests.
Note this will print a lot of error message to stderr since we don't capture the JVM sub process
stdout/stderr for doctests.
Note this will print a lot of error message to stderr since we don't
capture the JVM sub process stdout/stderr for doctests.
"""
import doctest

Expand Down
52 changes: 52 additions & 0 deletions python/examples/pandera_ex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from pyspark.sql.session import SparkSession

# tags::pandera_imports[]
import pandera.pyspark as pa
import pyspark.sql.types as T

# end::pandera_imports[]


# tag::simple_data_schema[]
class ProjectDataSchema(pa.DataFrameModel):
# Note str_length is currently broken :/
creator: T.StringType() = pa.Field(str_length={"min_value": 1})
projectname: T.StringType() = pa.Field()
stars: T.IntegerType() = pa.Field(ge=0)


# end::simple_data_schema[]


# tag::gender_data[]
class GenderData(pa.DataFrameModel):
MaleBonusPercent: T.DoubleType() = pa.Field(nullable=True, le=5)
FemaleBonusPercent: T.DoubleType() = pa.Field(nullable=True)
CompanyNumber: T.IntegerType() = pa.Field()


# end::gender_data[]

if __name__ == "__main__":
spark = SparkSession.builder.master("local[4]").getOrCreate()
# Make sure to make
# "https://gender-pay-gap.service.gov.uk/viewing/download-data/2021"
# available as ./data/2021
uk_df = spark.read.csv("data/fetched/2021", header=True, inferSchema=True)

# tag::validate_gender_data[]
validated_df = GenderData(uk_df)
# Print out the errors. You may wish to exit with an error condition.
if validated_df.pandera.errors != {}:
print(validated_df.pandera.errors)
# sys.exit(1)
# end::validate_gender_data[]

# tag::validate_project_data[]
project_data = spark.read.csv("./data/project.csv", header=True, inferSchema=True)
validated_df = ProjectDataSchema(project_data)
# Print out the errors. You may wish to exit with an error condition.
if validated_df.pandera.errors != {}:
print(validated_df.pandera.errors)
# sys.exit(1)
# end::validate_project_data[]
5 changes: 4 additions & 1 deletion python/examples/simple_perf.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# When running this example make sure to include the built Scala jar :
# $SPARK_HOME/bin/pyspark --jars ./target/examples-0.0.1.jar --driver-class-path ./target/examples-0.0.1.jar
#
# $SPARK_HOME/bin/pyspark --jars \
# ./target/examples-0.0.1.jar --driver-class-path ./target/examples-0.0.1.jar
#
# This example illustrates how to interface Scala and Python code, but caution
# should be taken as it depends on many private members that may change in
# future releases of Spark.
Expand Down
105 changes: 105 additions & 0 deletions python/examples/spark_expectations_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
from pyspark import SparkFiles
from pyspark.sql import *
from spark_expectations.core.expectations import SparkExpectations

spark = SparkSession.builder.master("local[4]").getOrCreate()
sc = spark.sparkContext

# tag::global_setup[]
from spark_expectations.config.user_config import *

se_global_spark_Conf = {
"se_notifications_enable_email": False,
"se_notifications_email_smtp_host": "mailhost.example.com",
"se_notifications_email_smtp_port": 25,
"se_notifications_email_from": "[email protected]",
"se_notifications_email_subject": "spark expectations - data quality - notifications",
"se_notifications_on_fail": True,
"se_notifications_on_error_drop_exceeds_threshold_breach": True,
"se_notifications_on_error_drop_threshold": 15,
"se_enable_streaming": False, # Required or tries to publish to kafka.
}
# end::gloabl_setup[]


# tag::setup_and_load[]
spark.sql("DROP TABLE IF EXISTS local.magic_validation")
spark.sql(
"""
create table local.magic_validation (
product_id STRING,
table_name STRING,
rule_type STRING,
rule STRING,
column_name STRING,
expectation STRING,
action_if_failed STRING,
tag STRING,
description STRING,
enable_for_source_dq_validation BOOLEAN,
enable_for_target_dq_validation BOOLEAN,
is_active BOOLEAN,
enable_error_drop_alert BOOLEAN,
error_drop_threshold INT
)"""
)
spark.sql(
"""
create table if not exists local.pay_stats (
product_id STRING,
table_name STRING,
input_count LONG,
error_count LONG,
output_count LONG,
output_percentage FLOAT,
success_percentage FLOAT,
error_percentage FLOAT,
source_agg_dq_results array<map<string, string>>,
final_agg_dq_results array<map<string, string>>,
source_query_dq_results array<map<string, string>>,
final_query_dq_results array<map<string, string>>,
row_dq_res_summary array<map<string, string>>,
row_dq_error_threshold array<map<string, string>>,
dq_status map<string, string>,
dq_run_time map<string, float>,
dq_rules map<string, map<string,int>>,
meta_dq_run_id STRING,
meta_dq_run_date DATE,
meta_dq_run_datetime TIMESTAMP
);"""
)
rule_file = "./spark_expectations_sample_rules.json"
sc.addFile(rule_file)
df = spark.read.json(SparkFiles.get(rule_file))
print(df)
df.write.option("byname", "true").mode("append").saveAsTable("local.magic_validation")
spark.read.table("local.magic_validation").show()
se: SparkExpectations = SparkExpectations(
product_id="pay", debugger=True # Used to filter which rules we apply
)
# end::setup_and_load[]


# tag::run_validation[]
# Only row data quality checking
@se.with_expectations(
se.reader.get_rules_from_table(
product_rules_table="local.magic_validation",
target_table_name="local.bonuses",
dq_stats_table_name="local.pay_stats",
),
write_to_table=False,
row_dq=True,
# This does not work currently (Iceberg)
spark_conf={"format": "iceberg"},
options={"format": "iceberg"},
options_error_table={"format": "iceberg"},
)
def load_data():
raw_df = spark.read.csv("data/fetched/2021", header=True, inferSchema=True)
uk_df = raw_df.select("CompanyNumber", "MaleBonusPercent", "FemaleBonuspercent")
return uk_df


data = load_data()
# end::run_validation[]
1 change: 1 addition & 0 deletions python/examples/spark_expectations_sample_rules.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"product_id": "pay", "table_name": "local.bonuses", "rule_type": "row_dq", "rule": "bonus_checker", "column_name": "MaleBonusPercent", "expectation": "MaleBonusPercent > FemaleBonusPercent", "action_if_failed": "drop", "tag": "", "description": "Sample rule that the male bonuses should be higher. Thankfully this fails (but could be lower base pay etc.)", "enable_for_source_dq_validation": true, "enable_for_target_dq_validation": true, "is_active": true, "enable_error_drop_alert": true, "error_drop_threshold": 1}
6 changes: 6 additions & 0 deletions python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,9 @@ pandas
pyarrow
pyspark
pyspark-asyncactions
pandera
pandera[pyspark]
spark-expectations
venv-pack
delta-spark
requests
2 changes: 1 addition & 1 deletion python/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ deps =
[testenv:flake8]
extras = tests
skipsdist = True
commands = flake8 examples
commands = flake8 --ignore=F403,E402,F401,F405 examples
allowlist_externals = flake8

[testenv:mypy]
Expand Down
Loading

0 comments on commit 4e109c4

Please sign in to comment.