Skip to content

Commit 92f3cb6

Browse files
committed
Fix test for complete
1 parent 42e37ec commit 92f3cb6

File tree

2 files changed

+14
-12
lines changed

2 files changed

+14
-12
lines changed

core/src/main/scala/com/high-performance-spark-examples/streaming/structuredstreaming/JsonWindowedAggExample.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,7 @@ object JsonWindowedAggExample {
2727
checkpointDir = "/tmp/checkpoints/json_windowed_agg",
2828
outputFormat = "console",
2929
queryName = None,
30-
trigger = Trigger.ProcessingTime("5 seconds"),
31-
addWatermark = false
30+
trigger = Trigger.ProcessingTime("5 seconds")
3231
)
3332
}
3433

@@ -39,23 +38,23 @@ object JsonWindowedAggExample {
3938
checkpointDir: String,
4039
outputFormat: String,
4140
queryName: Option[String],
42-
trigger: Trigger,
43-
addWatermark: Boolean
41+
trigger: Trigger
4442
): StreamingQuery = {
4543
import spark.implicits._
4644

45+
// tag::streaming_ex_json_window[]
4746
val df = spark.readStream
4847
.format("json")
4948
.schema("timestamp TIMESTAMP, word STRING")
5049
.load(inputPath)
5150

52-
val base = if (addWatermark) df.withWatermark("timestamp", "5 minutes") else df
53-
val windowed = base
51+
val windowed = df
5452
.groupBy(window(col("timestamp"), "10 minutes"), col("word"))
5553
.count()
54+
// end::streaming_ex_json_window[]
5655

5756
val writer = windowed.writeStream
58-
.outputMode("append")
57+
.outputMode("complete") // Append would need a watermark
5958
.format(outputFormat)
6059
.option("checkpointLocation", checkpointDir)
6160
.trigger(trigger)

core/src/test/scala/com/high-performance-spark-examples/streaming/structuredstreaming/JsonWindowedAggExampleSuite.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ class JsonWindowedAggExampleFileIT extends AnyFunSuite {
3535
outputFormat = "memory", // assertable sink
3636
queryName = Some(qName),
3737
trigger = Trigger.ProcessingTime("250 milliseconds"),
38-
addWatermark = true // watermark = 5 minutes (set in builder)
3938
)
4039

4140
// --- Batch 1: events in [10:00,10:10)
@@ -50,8 +49,10 @@ class JsonWindowedAggExampleFileIT extends AnyFunSuite {
5049
// Let the stream pick up batch 1
5150
q.processAllAvailable() // ok in tests
5251

53-
// Nothing should be emitted yet in append mode (window not closed)
54-
assert(spark.table(qName).count() == 0)
52+
val initialCount = spark.table(qName).count()
53+
54+
// We're running in complete mode, we should see some records.
55+
assert(initialCount > 0)
5556

5657
// --- Batch 2: later event at 10:16 moves max event time to 10:16
5758
// Watermark = maxEventTime - 5m = 10:11 >= 10:10, so [10:00,10:10) closes and emits.
@@ -75,7 +76,8 @@ class JsonWindowedAggExampleFileIT extends AnyFunSuite {
7576

7677
val expectedAfterBatch2 = Set(
7778
("2025-01-01 10:00:00", "2025-01-01 10:10:00", "hello", 2L),
78-
("2025-01-01 10:00:00", "2025-01-01 10:10:00", "world", 1L)
79+
("2025-01-01 10:00:00", "2025-01-01 10:10:00", "world", 1L),
80+
("2025-01-01 10:10:00", "2025-01-01 10:20:00", "hello", 1L)
7981
)
8082
assert(afterBatch2 == expectedAfterBatch2)
8183

@@ -100,7 +102,8 @@ class JsonWindowedAggExampleFileIT extends AnyFunSuite {
100102
.toSet
101103

102104
val expectedFinal = expectedAfterBatch2 ++ Set(
103-
("2025-01-01 10:10:00", "2025-01-01 10:20:00", "hello", 1L)
105+
("2025-01-01 10:10:00", "2025-01-01 10:20:00", "hello", 1L),
106+
("2025-01-01 10:20:00", "2025-01-01 10:30:00", "noop", 1),
104107
)
105108
assert(finalOut == expectedFinal)
106109

0 commit comments

Comments
 (0)