Skip to content

Commit 42e37ec

Browse files
committed
Improve test and fix up native build
1 parent e23416d commit 42e37ec

File tree

3 files changed

+146
-54
lines changed

3 files changed

+146
-54
lines changed
Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
package com.highperformancespark.examples.structuredstreaming
22

3-
// Windowed aggregation with watermark on JSON input
4-
// Watermarking is needed to bound state and drop late data
5-
63
import org.apache.spark.sql.SparkSession
74
import org.apache.spark.sql.functions._
8-
import org.apache.spark.sql.streaming.Trigger
5+
import org.apache.spark.sql.streaming._
96

107
object JsonWindowedAggExample {
118
def main(args: Array[String]): Unit = {
@@ -14,25 +11,56 @@ object JsonWindowedAggExample {
1411
.appName("JsonWindowedAggExample")
1512
.master("local[2]")
1613
.getOrCreate()
14+
run(spark)
15+
}
16+
17+
def run(spark: SparkSession): Unit = {
18+
val query = makeQuery(spark)
19+
query.awaitTermination()
20+
}
1721

22+
/** Your original behavior (console sink, no watermark, continuous). */
23+
def makeQuery(spark: SparkSession): StreamingQuery = {
24+
makeQueryWith(
25+
spark,
26+
inputPath = "/tmp/json_input",
27+
checkpointDir = "/tmp/checkpoints/json_windowed_agg",
28+
outputFormat = "console",
29+
queryName = None,
30+
trigger = Trigger.ProcessingTime("5 seconds"),
31+
addWatermark = false
32+
)
33+
}
34+
35+
/** Parametric builder used by tests (and optional batch-like runs). */
36+
def makeQueryWith(
37+
spark: SparkSession,
38+
inputPath: String,
39+
checkpointDir: String,
40+
outputFormat: String,
41+
queryName: Option[String],
42+
trigger: Trigger,
43+
addWatermark: Boolean
44+
): StreamingQuery = {
1845
import spark.implicits._
19-
// tag::streaming_ex_json_window[]
46+
2047
val df = spark.readStream
2148
.format("json")
2249
.schema("timestamp TIMESTAMP, word STRING")
23-
.load("/tmp/json_input")
50+
.load(inputPath)
2451

25-
val windowed = df
52+
val base = if (addWatermark) df.withWatermark("timestamp", "5 minutes") else df
53+
val windowed = base
2654
.groupBy(window(col("timestamp"), "10 minutes"), col("word"))
2755
.count()
28-
// end::streaming_ex_json_window[]
2956

30-
val query = windowed.writeStream
57+
val writer = windowed.writeStream
3158
.outputMode("append")
32-
.format("console")
33-
.option("checkpointLocation", "./tmp/checkpoints/json_windowed_agg")
34-
.start()
59+
.format(outputFormat)
60+
.option("checkpointLocation", checkpointDir)
61+
.trigger(trigger)
3562

36-
query.awaitTermination()
63+
val named = queryName.fold(writer)(n => writer.queryName(n))
64+
named.start()
3765
}
3866
}

core/src/test/scala/com/high-performance-spark-examples/streaming/structuredstreaming/JsonWindowedAggExampleSuite.scala

Lines changed: 100 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,110 @@
11
package com.highperformancespark.examples.structuredstreaming
22

3-
// tag::streaming_ex_json_window_test[]
4-
// Test for JsonWindowedAggExample: verifies late rows are dropped and state is bounded
5-
63
import org.scalatest.funsuite.AnyFunSuite
7-
import org.apache.spark.sql.SparkSession
8-
import org.apache.spark.sql.streaming.Trigger
4+
import org.apache.spark.sql.{SaveMode, SparkSession}
95
import org.apache.spark.sql.functions._
6+
import org.apache.spark.sql.streaming.Trigger
7+
import java.nio.file.Files
108
import java.sql.Timestamp
119

12-
class JsonWindowedAggExampleSuite extends AnyFunSuite {
13-
test("windowed agg drops late rows beyond watermark") {
14-
val spark = SparkSession.builder()
10+
class JsonWindowedAggExampleFileIT extends AnyFunSuite {
11+
12+
private def withSpark[T](f: SparkSession => T): T = {
13+
val spark = SparkSession.builder()
14+
.appName("JsonWindowedAggExampleFileIT")
1515
.master("local[2]")
16-
.appName("JsonWindowedAggExampleSuite")
16+
.config("spark.ui.enabled", "false")
17+
.config("spark.sql.shuffle.partitions", "2")
1718
.getOrCreate()
18-
import spark.implicits._
19-
20-
import org.apache.spark.sql.execution.streaming.MemoryStream
21-
val inputStream = MemoryStream[(Timestamp, String)](1, spark.sqlContext)
22-
val now = System.currentTimeMillis()
23-
val rows = Seq(
24-
(new Timestamp(now - 1000 * 60 * 5), "foo"), // within window
25-
(new Timestamp(now - 1000 * 60 * 50), "bar"), // late, beyond watermark
26-
(new Timestamp(now - 1000 * 60 * 2), "foo") // within window
27-
)
28-
inputStream.addData(rows: _*)
29-
val df = inputStream.toDF().toDF("timestamp", "word")
30-
val withWatermark = df.withWatermark("timestamp", "42 minutes")
31-
val windowed = withWatermark
32-
.groupBy(window(col("timestamp"), "10 minutes"), col("word"))
33-
.count()
34-
35-
val query = windowed.writeStream
36-
.outputMode("append")
37-
.format("memory")
38-
.queryName("json_windowed_agg")
39-
.trigger(Trigger.Once())
40-
.option("checkpointLocation", "./tmp/checkpoints/json_windowed_agg_test")
41-
.start()
42-
query.processAllAvailable()
43-
query.awaitTermination()
44-
45-
val result = spark.sql("select word, count from json_windowed_agg").collect().map(_.getString(0)).toSet
46-
assert(result.contains("foo"))
47-
assert(!result.contains("bar"), "Late row 'bar' should be dropped")
48-
spark.stop()
19+
try f(spark) finally spark.stop()
20+
}
21+
22+
test("file JSON source: sequential writes close windows via watermark (append mode)") {
23+
withSpark { spark =>
24+
import spark.implicits._
25+
26+
val inputDir = Files.createTempDirectory("json-input-it").toFile.getAbsolutePath
27+
val chkDir = Files.createTempDirectory("chk-it").toFile.getAbsolutePath
28+
val qName = "json_winagg_mem_it"
29+
30+
// Start the stream FIRST, using a periodic trigger and a watermark
31+
val q = JsonWindowedAggExample.makeQueryWith(
32+
spark,
33+
inputPath = inputDir,
34+
checkpointDir = chkDir,
35+
outputFormat = "memory", // assertable sink
36+
queryName = Some(qName),
37+
trigger = Trigger.ProcessingTime("250 milliseconds"),
38+
addWatermark = true // watermark = 5 minutes (set in builder)
39+
)
40+
41+
// --- Batch 1: events in [10:00,10:10)
42+
Seq(
43+
("2025-01-01 10:01:00", "hello"),
44+
("2025-01-01 10:05:00", "hello"),
45+
("2025-01-01 10:05:00", "world")
46+
).map { case (ts, w) => (Timestamp.valueOf(ts), w) }
47+
.toDF("timestamp","word")
48+
.write.mode(SaveMode.Append).json(inputDir)
49+
50+
// Let the stream pick up batch 1
51+
q.processAllAvailable() // ok in tests
52+
53+
// Nothing should be emitted yet in append mode (window not closed)
54+
assert(spark.table(qName).count() == 0)
55+
56+
// --- Batch 2: later event at 10:16 moves max event time to 10:16
57+
// Watermark = maxEventTime - 5m = 10:11 >= 10:10, so [10:00,10:10) closes and emits.
58+
Seq(("2025-01-01 10:16:00", "hello"))
59+
.map { case (ts, w) => (Timestamp.valueOf(ts), w) }
60+
.toDF("timestamp","word")
61+
.write.mode(SaveMode.Append).json(inputDir)
62+
63+
q.processAllAvailable()
64+
65+
val afterBatch2 = spark.table(qName)
66+
.select(
67+
date_format(col("window.start"), "yyyy-MM-dd HH:mm:ss").as("start"),
68+
date_format(col("window.end"), "yyyy-MM-dd HH:mm:ss").as("end"),
69+
col("word"),
70+
col("count")
71+
)
72+
.collect()
73+
.map(r => (r.getString(0), r.getString(1), r.getString(2), r.getLong(3)))
74+
.toSet
75+
76+
val expectedAfterBatch2 = Set(
77+
("2025-01-01 10:00:00", "2025-01-01 10:10:00", "hello", 2L),
78+
("2025-01-01 10:00:00", "2025-01-01 10:10:00", "world", 1L)
79+
)
80+
assert(afterBatch2 == expectedAfterBatch2)
81+
82+
// --- Batch 3: event at 10:26 closes [10:10,10:20)
83+
// New watermark = 10:21 >= 10:20 ⇒ the second window can now emit.
84+
Seq(("2025-01-01 10:26:00", "noop"))
85+
.map { case (ts, w) => (Timestamp.valueOf(ts), w) }
86+
.toDF("timestamp","word")
87+
.write.mode(SaveMode.Append).json(inputDir)
88+
89+
q.processAllAvailable()
90+
91+
val finalOut = spark.table(qName)
92+
.select(
93+
date_format(col("window.start"), "yyyy-MM-dd HH:mm:ss").as("start"),
94+
date_format(col("window.end"), "yyyy-MM-dd HH:mm:ss").as("end"),
95+
col("word"),
96+
col("count")
97+
)
98+
.collect()
99+
.map(r => (r.getString(0), r.getString(1), r.getString(2), r.getLong(3)))
100+
.toSet
101+
102+
val expectedFinal = expectedAfterBatch2 ++ Set(
103+
("2025-01-01 10:10:00", "2025-01-01 10:20:00", "hello", 1L)
104+
)
105+
assert(finalOut == expectedFinal)
106+
107+
q.stop()
108+
}
49109
}
50110
}
51-
// end::streaming_ex_json_window_test[]

native/src/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ option(SBT "Set if invoked from sbt-jni" OFF)
1414
#
1515
project (high-performance-spark)
1616
enable_language(Fortran)
17+
enable_language(C)
18+
enable_language(CXX)
1719
set(PROJECT_VERSION_MAJOR 0)
1820
set(PROJECT_VERSION_MINOR 0)
1921
set(PROJECT_VERSION_PATCH 0)
@@ -41,6 +43,9 @@ endif()
4143
#end::velox[]
4244

4345
# Setup JNI
46+
if(DEFINED ENV{JAVA_HOME})
47+
set(JAVA_HOME "$ENV{JAVA_HOME}" CACHE PATH "JAVA_HOME for JNI discovery")
48+
endif()
4449
find_package(JNI REQUIRED)
4550
if (JNI_FOUND)
4651
message (STATUS "JNI include directories: ${JNI_INCLUDE_DIRS}")

0 commit comments

Comments
 (0)