Skip to content

Commit beb4998

Browse files
authored
Spark 4: Update GHA & Get Tests Running Again (#140)
* Update GHA * Add missing uses for setting up the JDK * Add sbt explicitly now * Bump spark testing version * Update more to Spark 4, except PySpark ex which uses Iceberg leave that at 3.5 * remove loadsave for Spark3 compilation with PySpark. * Install proto if needed. * Fix rm * Add distutils fix typo * Setuptools
1 parent c09658e commit beb4998

File tree

9 files changed

+264
-211
lines changed

9 files changed

+264
-211
lines changed

.github/workflows/ci.yml

Lines changed: 235 additions & 173 deletions
Large diffs are not rendered by default.

accelerators/setup_comet.sh

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@
33
set -ex
44
source install_rust_if_needed.sh
55

6+
if command -v protoc >/dev/null 2>&1; then
7+
echo "protoc already installed"
8+
else
9+
sudo apt-get install -y protobuf-compiler
10+
fi
11+
612
if [ -z "${SPARK_MAJOR}" ]; then
713
echo "Need a spark major version specified."
814
exit 1

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ lazy val core = (project in file("core")) // regular scala code with @native met
7878
Test / javaOptions ++= specialOptions,
7979
// 2.4.5 is the highest version we have with the old spark-testing-base deps
8080
sparkVersion := System.getProperty("sparkVersion", "4.0.0"),
81-
sparkTestingVersion := "2.0.1",
81+
sparkTestingVersion := "2.1.2",
8282
// additional libraries
8383
libraryDependencies ++= Seq(
8484
"org.apache.spark" %% "spark-core" % sparkVersion.value % Provided,

core/src/main/java/com/highperformancespark/examples/dataframe/JavaHappyPandas.java

Lines changed: 9 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@
44
import org.apache.spark.api.java.JavaSparkContext;
55
import org.apache.spark.sql.Column;
66
import org.apache.spark.sql.*;
7-
import org.apache.spark.sql.SQLContext;
7+
import org.apache.spark.sql.SparkSession;
88
import org.apache.spark.sql.expressions.Window;
99
import org.apache.spark.sql.expressions.WindowSpec;
10-
import org.apache.spark.sql.hive.HiveContext;
1110

1211
import java.util.HashMap;
1312
import java.util.Map;
@@ -16,39 +15,23 @@
1615

1716
public class JavaHappyPandas {
1817

19-
/**
20-
* Creates SQLContext with an existing SparkContext.
21-
*/
22-
public static SQLContext sqlContext(JavaSparkContext jsc) {
23-
SQLContext sqlContext = new SQLContext(jsc);
24-
return sqlContext;
25-
}
26-
27-
/**
28-
* Creates HiveContext with an existing SparkContext.
29-
*/
30-
public static HiveContext hiveContext(JavaSparkContext jsc) {
31-
HiveContext hiveContext = new HiveContext(jsc);
32-
return hiveContext;
33-
}
34-
3518
/**
3619
* Illustrate loading some JSON data.
3720
*/
38-
public static Dataset<Row> loadDataSimple(JavaSparkContext jsc, SQLContext sqlContext, String path) {
39-
Dataset<Row> df1 = sqlContext.read().json(path);
21+
public static Dataset<Row> loadDataSimple(JavaSparkContext jsc, SparkSession session, String path) {
22+
Dataset<Row> df1 = session.read().json(path);
4023

41-
Dataset<Row> df2 = sqlContext.read().format("json").option("samplingRatio", "1.0").load(path);
24+
Dataset<Row> df2 = session.read().format("json").option("samplingRatio", "1.0").load(path);
4225

4326
JavaRDD<String> jsonRDD = jsc.textFile(path);
44-
Dataset<Row> df3 = sqlContext.read().json(jsonRDD);
27+
Dataset<Row> df3 = session.read().json(jsonRDD);
4528

4629
return df1;
4730
}
4831

49-
public static Dataset<Row> jsonLoadFromRDD(SQLContext sqlContext, JavaRDD<String> input) {
32+
public static Dataset<Row> jsonLoadFromRDD(SparkSession session, JavaRDD<String> input) {
5033
JavaRDD<String> rdd = input.filter(e -> e.contains("panda"));
51-
Dataset<Row> df = sqlContext.read().json(rdd);
34+
Dataset<Row> df = session.read().json(rdd);
5235
return df;
5336
}
5437

@@ -147,10 +130,10 @@ public static Dataset<Row> minMeanSizePerZip(Dataset<Row> pandas) {
147130
}
148131

149132
public static Dataset<Row> simpleSqlExample(Dataset<Row> pandas) {
150-
SQLContext sqlContext = pandas.sqlContext();
133+
SparkSession session = SparkSession.builder().getOrCreate();
151134
pandas.registerTempTable("pandas");
152135

153-
Dataset<Row> miniPandas = sqlContext.sql("SELECT * FROM pandas WHERE pandaSize < 12");
136+
Dataset<Row> miniPandas = session.sql("SELECT * FROM pandas WHERE pandaSize < 12");
154137
return miniPandas;
155138
}
156139

core/src/main/scala/com/high-performance-spark-examples/dataframe/HappyPandas.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -348,18 +348,19 @@ object HappyPandas {
348348
* Cut the lineage of a DataFrame which has too long a query plan.
349349
*/
350350
def cutLineage(df: DataFrame): DataFrame = {
351-
val sqlCtx = df.sqlContext
351+
val session = SparkSession.builder.getOrCreate()
352+
import session.implicits._
352353
//tag::cutLineage[]
353354
val rdd = df.rdd
354355
rdd.cache()
355-
sqlCtx.createDataFrame(rdd, df.schema)
356+
session.createDataFrame(rdd, df.schema)
356357
//end::cutLineage[]
357358
}
358359

359360
// Self join
360361
def selfJoin(df: DataFrame): DataFrame = {
361-
val sqlCtx = df.sqlContext
362-
import sqlCtx.implicits._
362+
val session = SparkSession.builder.getOrCreate()
363+
import session.implicits._
363364
//tag::selfJoin[]
364365
val joined = df.as("a").join(df.as("b")).where($"a.name" === $"b.name")
365366
//end::selfJoin[]

core/src/main/scala/com/high-performance-spark-examples/dataframe/NullabilityFilterOptimizer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import org.apache.spark.sql.catalyst.optimizer._
88
import org.apache.spark.sql.catalyst.plans.logical._
99
import org.apache.spark.sql.catalyst.rules.Rule
1010
import org.apache.spark.sql.catalyst.trees.TreePattern._
11-
import org.apache.spark.sql.catalyst.expressions.{And, IsNotNull, NullIntolerant}
11+
import org.apache.spark.sql.catalyst.expressions.{And, IsNotNull}
1212

1313
object NullabilityFilterOptimizer extends Rule[LogicalPlan] {
1414

core/src/main/scala/com/high-performance-spark-examples/ml/SimplePipeline.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import com.highperformancespark.examples.dataframe._
1717

1818
object SimplePipeline {
1919
def constructAndSetParams(df: DataFrame) = {
20-
val sqlCtx = df.sqlContext
2120
//tag::constructSetParams[]
2221
val hashingTF = new HashingTF()
2322
hashingTF.setInputCol("input")
@@ -26,7 +25,6 @@ object SimplePipeline {
2625
}
2726

2827
def constructSimpleTransformer(df: DataFrame) = {
29-
val sqlCtx = df.sqlContext
3028
//tag::simpleTransformer[]
3129
val hashingTF = new HashingTF()
3230
// We don't set the output column here so the default output column of
@@ -62,7 +60,6 @@ object SimplePipeline {
6260
}
6361

6462
def constructSimpleEstimator(df: DataFrame) = {
65-
val sqlCtx = df.sqlContext
6663
//tag::simpleNaiveBayes[]
6764
val nb = new NaiveBayes()
6865
nb.setLabelCol("happy")

env_setup.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ set -ex
44

55
# Download Spark and iceberg if not present
66
SPARK_MAJOR=${SPARK_MAJOR:-"3.5"}
7-
SPARK_VERSION=${SPARK_VERSION:-"${SPARK_MAJOR}.2"}
7+
SPARK_VERSION=${SPARK_VERSION:-"${SPARK_MAJOR}.3"}
88
SCALA_VERSION=${SCALA_VERSION:-"2.13"}
99
HADOOP_VERSION="3"
1010
SPARK_PATH="$(pwd)/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}"
@@ -13,7 +13,7 @@ if [ "$SCALA_VERSION" = "2.13" ]; then
1313
SPARK_FILE="spark-${SPARK_VERSION}-bin-hadoop3-scala2.13.tgz"
1414
SPARK_PATH="$(pwd)/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}-scala2.13"
1515
fi
16-
ICEBERG_VERSION=${ICEBERG_VERSION:-"1.6.0"}
16+
ICEBERG_VERSION=${ICEBERG_VERSION:-"1.9.2"}
1717
if [ ! -f "${SPARK_FILE}" ]; then
1818
SPARK_DIST_URL="https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/${SPARK_FILE}"
1919
SPARK_ARCHIVE_DIST_URL="https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/${SPARK_FILE}"

run_pyspark_examples.sh

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,12 @@ function check_fail () {
4242

4343
EXAMPLE_JAR="./core/target/scala-2.13/core-assembly-0.1.0-SNAPSHOT.jar"
4444

45+
pip install setuptools
46+
47+
# Iceberg JAR not yet available for Spark 4.
4548
if [ ! -f "${EXAMPLE_JAR}" ]; then
46-
sbt core/assembly
49+
rm ./core/src/main/scala/com/high-performance-spark-examples/dataframe/LoadSave.scala # temp hack no merge in Spark 3.
50+
sbt core/assembly -DsparkVersion="${SPARK_VERSION}"
4751
fi
4852

4953
if [ ! -f "${EXAMPLE_JAR}" ]; then

0 commit comments

Comments
 (0)