Skip to content

Commit

Permalink
Unify python examples and update run scripts (#111)
Browse files Browse the repository at this point in the history
* Unify the PySpark examples (idk why they were seperated) and add a script to run them same as the SQL ex.

* Try and make run more flex

* Change how we trigger OOMing.

* Skip doctest of OOM since it puts SparkContext into a bad state.

* Add a quote and disable SC2046
  • Loading branch information
holdenk authored Sep 5, 2023
1 parent 79acfc1 commit 5ff04dc
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 79 deletions.
15 changes: 15 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,21 @@ jobs:
- name: Run sql examples
run:
./run_sql_examples.sh
run-pyspark-examples:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Cache Spark and friends
uses: actions/cache@v3
with:
path: |
spark*.tgz
iceberg*.jar
key: spark-artifacts
- name: Run PySpark examples
run:
./run_pyspark_examples.sh
style:
runs-on: ubuntu-latest
steps:
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,5 @@ warehouse/
metastore_db/

# Misc internal stuff
sql/*.sql.out
sql/*.sql.out
python/examples/*.py.out
33 changes: 33 additions & 0 deletions env_setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/bin/bash


# Download Spark and iceberg if not present
SPARK_MAJOR="3.4"
SPARK_VERSION=3.4.1
HADOOP_VERSION="3"
SPARK_PATH="spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}"
SPARK_FILE="spark-${SPARK_VERSION}-bin-hadoop3.tgz"
ICEBERG_VERSION="1.3.1"
if [ ! -f "${SPARK_FILE}" ]; then
wget "https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/${SPARK_FILE}" &
fi
# Download Icberg if not present
ICEBERG_FILE="iceberg-spark-runtime-${SPARK_MAJOR}_2.13-${ICEBERG_VERSION}.jar"
if [ ! -f "${ICEBERG_FILE}" ]; then
wget "https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark-runtime-${SPARK_MAJOR}_2.13/${ICEBERG_VERSION}/${ICEBERG_FILE}" -O "${ICEBERG_FILE}" &
fi
wait
# Setup the env
if [ ! -d "${SPARK_PATH}" ]; then
tar -xf ${SPARK_FILE}
fi
if [ ! -f "${SPARK_PATH}/jars/${ICEBERG_FILE}" ]; then
cp "${ICEBERG_FILE}" "${SPARK_PATH}/jars/${ICEBERG_FILE}"
fi

# Set up for running pyspark and friends
export PATH=${SPARK_PATH}:${SPARK_PATH}/python:${SPARK_PATH}/bin:${SPARK_PATH}/sbin:${PATH}

# Make sure we have a history directory
mkdir -p /tmp/spark-events

Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
from pyspark.sql import DataFrame, Row
from pyspark.sql.session import SparkSession
import sys

global df
global sc
global rdd
global spark


"""
>>> df = rdd.toDF()
>>> df2 = cutLineage(df)
Expand All @@ -7,14 +17,6 @@
True
"""

global df
global sc
global rdd
global spark

from pyspark.context import SparkContext
from pyspark.sql import DataFrame, Row
from pyspark.sql.session import SparkSession

# tag::cutLineage[]
def cutLineage(df):
Expand All @@ -31,11 +33,8 @@ def cutLineage(df):
jSchema = df._jdf.schema()
jRDD.cache()
sqlCtx = df.sql_ctx
try:
javaSqlCtx = sqlCtx._jsqlContext
except:
javaSqlCtx = sqlCtx._ssql_ctx
newJavaDF = javaSqlCtx.createDataFrame(jRDD, jSchema)
javaSparkSession = sqlCtx._jSparkSession
newJavaDF = javaSparkSession.createDataFrame(jRDD, jSchema)
newDF = DataFrame(newJavaDF, sqlCtx)
return newDF

Expand All @@ -50,7 +49,7 @@ def _setupTest():
sc.setLogLevel("ERROR")
globs["sc"] = sc
globs["spark"] = spark
globs["rdd"] = rdd = sc.parallelize(
globs["rdd"] = sc.parallelize(
[
Row(field1=1, field2="row1"),
Row(field1=2, field2="row2"),
Expand All @@ -75,8 +74,6 @@ def _test():
exit(-1)


import sys

if __name__ == "__main__":
_test()
# Hack to support running in nose
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# This script triggers a number of different PySpark errors

from pyspark import *
from pyspark.sql.session import SparkSession
import sys

global sc

Expand Down Expand Up @@ -131,22 +131,20 @@ def loggedDivZero(x):

def runOutOfMemory(sc):
"""
Run out of memory on the workers.
In standalone modes results in a memory error, but in YARN may trigger YARN container
overhead errors.
>>> runOutOfMemory(sc)
Run out of memory on the workers from a skewed shuffle.
>>> runOutOfMemory(sc) # doctest: +SKIP
Traceback (most recent call last):
...
Py4JJavaError:...
"""
# tag::worker_oom[]
data = sc.parallelize(range(10))
data = sc.parallelize(range(10000))

def generate_too_much(itr):
return range(10000000000000)
def generate_too_much(i: int):
return list(map(lambda v: (i % 2, v), range(100000 * i)))

itr = data.flatMap(generate_too_much)
itr.count()
bad = data.flatMap(generate_too_much).groupByKey()
bad.count()
# end::worker_oom[]


Expand All @@ -166,17 +164,18 @@ def _test():
"""
import doctest

globs = setupTest()
globs = _setupTest()
(failure_count, test_count) = doctest.testmod(
globs=globs, optionflags=doctest.ELLIPSIS
)
print("All tests done, stopping Spark context.")
globs["sc"].stop()
if failure_count:
exit(-1)
else:
exit(0)


import sys

if __name__ == "__main__":
_test()
# Hack to support running in nose
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
# should be taken as it depends on many private members that may change in
# future releases of Spark.

from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.types import StructType, IntegerType, DoubleType, StructField
from pyspark.sql import DataFrame, SparkSession
import sys
import timeit
import time

Expand All @@ -29,14 +30,7 @@ def generate_scale_data(sqlCtx, rows, numCols):
"""
# tag::javaInterop[]
sc = sqlCtx._sc
# Get the SQL Context, 2.1, 2.0 and pre-2.0 syntax - yay internals :p
try:
try:
javaSqlCtx = sqlCtx._jsqlContext
except:
javaSqlCtx = sqlCtx._ssql_ctx
except:
javaSqlCtx = sqlCtx._jwrapped
javaSparkSession = sqlCtx._jSparkSession
jsc = sc._jsc
scalasc = jsc.sc()
gateway = sc._gateway
Expand All @@ -54,13 +48,9 @@ def generate_scale_data(sqlCtx, rows, numCols):
schema = StructType(
[StructField("zip", IntegerType()), StructField("fuzzyness", DoubleType())]
)
# 2.1 / pre-2.1
try:
jschema = javaSqlCtx.parseDataType(schema.json())
except:
jschema = sqlCtx._jsparkSession.parseDataType(schema.json())
jschema = javaSparkSession.parseDataType(schema.json())
# Convert the Java RDD to Java DataFrame
java_dataframe = javaSqlCtx.createDataFrame(java_rdd, jschema)
java_dataframe = javaSparkSession.createDataFrame(java_rdd, jschema)
# Wrap the Java DataFrame into a Python DataFrame
python_dataframe = DataFrame(java_dataframe, sqlCtx)
# Convert the Python DataFrame into an RDD
Expand Down Expand Up @@ -143,13 +133,9 @@ def parseArgs(args):


if __name__ == "__main__":

"""
Usage: simple_perf_test scalingFactor size
"""
import sys
from pyspark import SparkContext
from pyspark.sql import SQLContext

(scalingFactor, size) = parseArgs(sys.argv)
session = SparkSession.appName("SimplePythonPerf").builder.getOrCreate()
Expand Down
20 changes: 20 additions & 0 deletions run_pyspark_examples.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash

source env_setup.sh

pip install -r ./python/requirements.txt

for ex in python/examples/*.py; do
# shellcheck disable=SC2046
spark-submit \
--master local[5] \
--conf spark.eventLog.enabled=true \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.catalog.spark_catalog.type=hive \
--conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.local.type=hadoop \
--conf "spark.sql.catalog.local.warehouse=$PWD/warehouse" \
$(cat "${ex}.conf" || echo "") \
--name "${ex}" \
"${ex}" 2>&1 | tee -a "${ex}.out"
done
30 changes: 1 addition & 29 deletions run_sql_examples.sh
Original file line number Diff line number Diff line change
@@ -1,35 +1,7 @@
#!/bin/bash
set -ex

# Download Spark and iceberg if not present
SPARK_MAJOR="3.4"
SPARK_VERSION=3.4.1
HADOOP_VERSION="3"
SPARK_PATH="spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}"
SPARK_FILE="spark-${SPARK_VERSION}-bin-hadoop3.tgz"
ICEBERG_VERSION="1.3.1"
if [ ! -f "${SPARK_FILE}" ]; then
wget "https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/${SPARK_FILE}" &
fi
# Download Icberg if not present
ICEBERG_FILE="iceberg-spark-runtime-${SPARK_MAJOR}_2.13-${ICEBERG_VERSION}.jar"
if [ ! -f "${ICEBERG_FILE}" ]; then
wget "https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark-runtime-${SPARK_MAJOR}_2.13/${ICEBERG_VERSION}/${ICEBERG_FILE}" -O "${ICEBERG_FILE}" &
fi
wait
# Setup the env
if [ ! -d "${SPARK_PATH}" ]; then
tar -xf ${SPARK_FILE}
fi
if [ ! -f "${SPARK_PATH}/jars/${ICEBERG_FILE}" ]; then
cp "${ICEBERG_FILE}" "${SPARK_PATH}/jars/${ICEBERG_FILE}"
fi

# Set up for running pyspark and friends
export PATH=${SPARK_PATH}:${SPARK_PATH}/python:${SPARK_PATH}/bin:${SPARK_PATH}/sbin:${PATH}

# Make sure we have a history directory
mkdir -p /tmp/spark-events
source env_setup.sh

# We use `` for mid multi-line command comments. (see https://stackoverflow.com/questions/9522631/how-to-put-a-line-comment-for-a-multi-line-command).
# For each SQL
Expand Down

0 comments on commit 5ff04dc

Please sign in to comment.