Skip to content

Commit

Permalink
Fix Python CI errors (#109)
Browse files Browse the repository at this point in the history
* Run black for Python formatting.

* ooh some Python 2 code was still around... my bad.

* Add missing deps for python tox
  • Loading branch information
holdenk authored Aug 28, 2023
1 parent 7ba42b2 commit 3c48c54
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 40 deletions.
34 changes: 22 additions & 12 deletions high_performance_pyspark/SQLLineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
def cutLineage(df):
"""
Cut the lineage of a DataFrame - used for iterative algorithms
.. Note: This uses internal members and may break between versions
>>> df = rdd.toDF()
>>> cutDf = cutLineage(df)
Expand All @@ -38,35 +38,45 @@ def cutLineage(df):
newJavaDF = javaSqlCtx.createDataFrame(jRDD, jSchema)
newDF = DataFrame(newJavaDF, sqlCtx)
return newDF


# end::cutLineage[]


def _setupTest():
globs = globals()
spark = SparkSession.builder \
.master("local[4]") \
.getOrCreate()
spark = SparkSession.builder.master("local[4]").getOrCreate()
sc = spark._sc
sc.setLogLevel("ERROR")
globs['sc'] = sc
globs['spark'] = spark
globs['rdd'] = rdd = sc.parallelize(
[Row(field1=1, field2="row1"),
Row(field1=2, field2="row2"),
Row(field1=3, field2="row3")])
globs["sc"] = sc
globs["spark"] = spark
globs["rdd"] = rdd = sc.parallelize(
[
Row(field1=1, field2="row1"),
Row(field1=2, field2="row2"),
Row(field1=3, field2="row3"),
]
)
return globs


def _test():
"""
Run the tests.
"""
import doctest

globs = _setupTest()
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
(failure_count, test_count) = doctest.testmod(
globs=globs, optionflags=doctest.ELLIPSIS
)
globs["sc"].stop()
if failure_count:
exit(-1)


import sys

if __name__ == "__main__":
_test()
# Hack to support running in nose
Expand Down
1 change: 0 additions & 1 deletion high_performance_pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,3 @@

import os
import sys

36 changes: 28 additions & 8 deletions high_performance_pyspark/bad_pyspark.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

global sc


def nonExistentInput(sc):
"""
Attempt to load non existent input
Expand All @@ -18,6 +19,7 @@ def nonExistentInput(sc):
failedRdd.count()
# end::nonExistent[]


def throwOuter(sc):
"""
Attempt to load non existant input
Expand All @@ -33,6 +35,7 @@ def throwOuter(sc):
transform2.count()
# end::throwOuter[]


def throwInner(sc):
"""
Attempt to load non existant input
Expand All @@ -48,6 +51,7 @@ def throwInner(sc):
transform2.count()
# end::throwInner[]


# tag::rewrite[]
def add1(x):
"""
Expand All @@ -57,6 +61,7 @@ def add1(x):
"""
return x + 1


def divZero(x):
"""
Divide by zero (cause an error)
Expand All @@ -67,6 +72,7 @@ def divZero(x):
"""
return x / 0


def throwOuter2(sc):
"""
Attempt to load non existant input
Expand All @@ -80,6 +86,7 @@ def throwOuter2(sc):
transform2 = transform1.map(divZero)
transform2.count()


def throwInner2(sc):
"""
Attempt to load non existant input
Expand All @@ -92,8 +99,11 @@ def throwInner2(sc):
transform1 = data.map(divZero)
transform2 = transform1.map(add1)
transform2.count()


# end::rewrite[]


def throwInner3(sc):
"""
Attempt to load non existant input
Expand All @@ -102,14 +112,17 @@ def throwInner3(sc):
"""
data = sc.parallelize(range(10))
rejectedCount = sc.accumulator(0)

def loggedDivZero(x):
import logging

try:
return [x / 0]
except Exception as e:
rejectedCount.add(1)
logging.warning("Error found " + repr(e))
return []

transform1 = data.flatMap(loggedDivZero)
transform2 = transform1.map(add1)
transform2.count()
Expand All @@ -128,35 +141,42 @@ def runOutOfMemory(sc):
"""
# tag::worker_oom[]
data = sc.parallelize(range(10))

def generate_too_much(itr):
return range(10000000000000)

itr = data.flatMap(generate_too_much)
itr.count()
# end::worker_oom[]


def _setupTest():
globs = globals()
spark = SparkSession.builder \
.master("local[4]") \
.getOrCreate()
spark = SparkSession.builder.master("local[4]").getOrCreate()
sc = spark._sc
globs['sc'] = sc
globs["sc"] = sc
return globs



def _test():
"""
Run the tests.
Run the tests.
Note this will print a lot of error message to stderr since we don't capture the JVM sub process
stdout/stderr for doctests.
"""
import doctest

globs = setupTest()
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
(failure_count, test_count) = doctest.testmod(
globs=globs, optionflags=doctest.ELLIPSIS
)
globs["sc"].stop()
if failure_count:
exit(-1)


import sys

if __name__ == "__main__":
_test()
# Hack to support running in nose
Expand Down
59 changes: 40 additions & 19 deletions high_performance_pyspark/simple_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import timeit
import time


def generate_scale_data(sqlCtx, rows, numCols):
"""
Generate scale data for the performance test.
Expand Down Expand Up @@ -45,14 +46,14 @@ def generate_scale_data(sqlCtx, rows, numCols):
# This returns a Java RDD of Rows - normally it would better to
# return a DataFrame directly, but for illustration we will work
# with an RDD of Rows.
java_rdd = (gateway.jvm.com.highperformancespark.examples.
tools.GenerateScalingData.
generateMiniScaleRows(scalasc, rows, numCols))
java_rdd = gateway.jvm.com.highperformancespark.examples.tools.GenerateScalingData.generateMiniScaleRows(
scalasc, rows, numCols
)
# Schemas are serialized to JSON and sent back and forth
# Construct a Python Schema and turn it into a Java Schema
schema = StructType([
StructField("zip", IntegerType()),
StructField("fuzzyness", DoubleType())])
schema = StructType(
[StructField("zip", IntegerType()), StructField("fuzzyness", DoubleType())]
)
# 2.1 / pre-2.1
try:
jschema = javaSqlCtx.parseDataType(schema.json())
Expand All @@ -67,19 +68,25 @@ def generate_scale_data(sqlCtx, rows, numCols):
return (python_dataframe, pairRDD)
# end::javaInterop[]


def runOnDF(df):
result = df.groupBy("zip").avg("fuzzyness").count()
return result


def runOnRDD(rdd):
result = rdd.map(lambda (x, y): (x, (y, 1))). \
reduceByKey(lambda x, y: (x[0] + y [0], x[1] + y[1])). \
count()
result = (
rdd.map(lambda x, y: (x, (y, 1)))
.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
.count()
)
return result


def groupOnRDD(rdd):
return rdd.groupByKey().mapValues(lambda v: sum(v) / float(len(v))).count()


def run(sc, sqlCtx, scalingFactor, size):
"""
Run the simple perf test printing the results to stdout.
Expand All @@ -98,17 +105,30 @@ def run(sc, sqlCtx, scalingFactor, size):
"""
(input_df, input_rdd) = generate_scale_data(sqlCtx, scalingFactor, size)
input_rdd.cache().count()
rddTimeings = timeit.repeat(stmt=lambda: runOnRDD(input_rdd), repeat=10, number=1, timer=time.time, setup='gc.enable()')
groupTimeings = timeit.repeat(stmt=lambda: groupOnRDD(input_rdd), repeat=10, number=1, timer=time.time, setup='gc.enable()')
rddTimeings = timeit.repeat(
stmt=lambda: runOnRDD(input_rdd),
repeat=10,
number=1,
timer=time.time,
setup="gc.enable()",
)
groupTimeings = timeit.repeat(
stmt=lambda: groupOnRDD(input_rdd),
repeat=10,
number=1,
timer=time.time,
setup="gc.enable()",
)
input_df.cache().count()
dfTimeings = timeit.repeat(stmt=lambda: runOnDF(input_df), repeat=10, number=1, timer=time.time, setup='gc.enable()')
print "RDD:"
print rddTimeings
print "group:"
print groupTimeings
print "df:"
print dfTimeings
print "yay"
dfTimeings = timeit.repeat(
stmt=lambda: runOnDF(input_df),
repeat=10,
number=1,
timer=time.time,
setup="gc.enable()",
)
print(f"RDD: {rddTimeings}, group: {groupTimeings}, df: {dfTimeings}")


def parseArgs(args):
"""
Expand All @@ -130,6 +150,7 @@ def parseArgs(args):
import sys
from pyspark import SparkContext
from pyspark.sql import SQLContext

(scalingFactor, size) = parseArgs(sys.argv)
session = SparkSession.appName("SimplePythonPerf").builder.getOrCreate()
sc = session._sc
Expand Down
3 changes: 3 additions & 0 deletions python/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ extras = tests
skipsdist = True
commands = black --check examples
allowlist_externals = black
deps =
black
-rrequirements.txt

[testenv:flake8]
extras = tests
Expand Down

0 comments on commit 3c48c54

Please sign in to comment.