Skip to content

Commit 8178cb0

Browse files
jayantdbHeartSaVioR
authored andcommitted
[SPARK-53491][SS] Fix exponential formatting of inputRowsPerSecond and processedRowsPerSecond in progress metrics JSON
### What changes were proposed in this pull request? This PR fixes an issue where `inputRowsPerSecond` and `processedRowsPerSecond` in streaming progress metrics JSON were displayed in scientific notation (e.g., `1.9871777605776876E8`). The fix uses safe `Decimal` casting to ensure values are displayed in a more human-readable format. ### Results Before change ``` { "id" : "9b512179-ea36-4b98-9d79-049d13813894", "runId" : "f85e2894-9582-493d-9b94-ce03e5490241", "name" : "TestFormatting", "timestamp" : "2025-09-04T10:57:02.897Z", "batchId" : 0, "batchDuration" : 1410, "numInputRows" : 900000, "inputRowsPerSecond" : 6.923076923076923E7, "processedRowsPerSecond" : 638297.8723404256, "durationMs" : { "addBatch" : 1101, "commitOffsets" : 157, "getBatch" : 0, "latestOffset" : 0, "queryPlanning" : 3, "triggerExecution" : 1410, "walCommit" : 149 }, "stateOperators" : [ ], "sources" : [ { "description" : "MemoryStream[value#133]", "startOffset" : null, "endOffset" : 0, "latestOffset" : null, "numInputRows" : 900000, "inputRowsPerSecond" : 6.923076923076923E7, "processedRowsPerSecond" : 638297.8723404256 } ], "sink" : { "description" : "MemorySink", "numOutputRows" : 900000 } } ``` After changes ``` { "id" : "03497c93-7ab7-4e14-ba5f-dadbfc8a4bf6", "runId" : "3933cdde-f99d-4a29-8bb8-d13bbb5df425", "name" : "TestFormatting", "timestamp" : "2025-09-04T15:50:45.500Z", "batchId" : 0, "batchDuration" : 1444, "numInputRows" : 900000, "inputRowsPerSecond" : 69230769.2, "processedRowsPerSecond" : 623268.7, "durationMs" : { "addBatch" : 1147, "commitOffsets" : 152, "getBatch" : 0, "latestOffset" : 0, "queryPlanning" : 3, "triggerExecution" : 1444, "walCommit" : 142 }, "stateOperators" : [ ], "sources" : [ { "description" : "MemoryStream[value#133]", "startOffset" : null, "endOffset" : 0, "latestOffset" : null, "numInputRows" : 900000, "inputRowsPerSecond" : 69230769.2, "processedRowsPerSecond" : 623268.7 } ], "sink" : { "description" : "MemorySink", "numOutputRows" : 900000 } } ``` ### Why are the changes needed? Improves the readability of Spark Structured Streaming progress metrics JSON. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Run this Maven test: ``` ./build/mvn -pl sql/core,sql/api \ -am test \ -DwildcardSuites=org.apache.spark.sql.streaming.StreamingQueryStatusAndProgressSuite \ -DwildcardTestName="SPARK-53491" ``` Results: ``` Run completed in 10 seconds, 680 milliseconds. Total number of tests run: 12 Suites: completed 2, aborted 0 Tests: succeeded 12, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #52237 from jayantdb/SPARK-53491-metrics-fix. Authored-by: jayant.sharma <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
1 parent 1dab449 commit 8178cb0

File tree

2 files changed

+89
-6
lines changed

2 files changed

+89
-6
lines changed

sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.lang.{Long => JLong}
2222
import java.util.UUID
2323

2424
import scala.jdk.CollectionConverters._
25+
import scala.math.BigDecimal.RoundingMode
2526
import scala.util.control.NonFatal
2627

2728
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
@@ -35,7 +36,7 @@ import org.json4s.jackson.JsonMethods._
3536
import org.apache.spark.annotation.Evolving
3637
import org.apache.spark.sql.Row
3738
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
38-
import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDoubleToJValue, safeMapToJValue}
39+
import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDecimalToJValue, safeMapToJValue}
3940
import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
4041

4142
/**
@@ -183,8 +184,8 @@ class StreamingQueryProgress private[spark] (
183184
("batchId" -> JInt(batchId)) ~
184185
("batchDuration" -> JInt(batchDuration)) ~
185186
("numInputRows" -> JInt(numInputRows)) ~
186-
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
187-
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
187+
("inputRowsPerSecond" -> safeDecimalToJValue(inputRowsPerSecond)) ~
188+
("processedRowsPerSecond" -> safeDecimalToJValue(processedRowsPerSecond)) ~
188189
("durationMs" -> safeMapToJValue[JLong](durationMs, v => JInt(v.toLong))) ~
189190
("eventTime" -> safeMapToJValue[String](eventTime, s => JString(s))) ~
190191
("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~
@@ -255,8 +256,8 @@ class SourceProgress protected[spark] (
255256
("endOffset" -> tryParse(endOffset)) ~
256257
("latestOffset" -> tryParse(latestOffset)) ~
257258
("numInputRows" -> JInt(numInputRows)) ~
258-
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
259-
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
259+
("inputRowsPerSecond" -> safeDecimalToJValue(inputRowsPerSecond)) ~
260+
("processedRowsPerSecond" -> safeDecimalToJValue(processedRowsPerSecond)) ~
260261
("metrics" -> safeMapToJValue[String](metrics, s => JString(s)))
261262
}
262263

@@ -316,6 +317,8 @@ private[sql] object SinkProgress {
316317
}
317318

318319
private object SafeJsonSerializer {
320+
321+
/** Convert Double to JValue while handling empty or infinite values */
319322
def safeDoubleToJValue(value: Double): JValue = {
320323
if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
321324
}
@@ -326,4 +329,10 @@ private object SafeJsonSerializer {
326329
val keys = map.asScala.keySet.toSeq.sorted
327330
keys.map { k => k -> valueToJValue(map.get(k)): JObject }.reduce(_ ~ _)
328331
}
332+
333+
/** Convert BigDecimal to JValue while handling empty or infinite values */
334+
def safeDecimalToJValue(value: Double): JValue = {
335+
if (value.isNaN || value.isInfinity) JNothing
336+
else JDecimal(BigDecimal(value).setScale(1, RoundingMode.HALF_UP))
337+
}
329338
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@ import java.time.temporal.ChronoUnit
2323
import java.util.UUID
2424

2525
import scala.jdk.CollectionConverters._
26+
import scala.math.BigDecimal.RoundingMode
2627

2728
import org.json4s.jackson.JsonMethods._
2829
import org.scalatest.concurrent.Eventually
2930
import org.scalatest.concurrent.PatienceConfiguration.Timeout
31+
import org.scalatest.matchers.should.Matchers
3032
import org.scalatest.time.SpanSugar._
3133

3234
import org.apache.spark.sql.Row
@@ -40,7 +42,7 @@ import org.apache.spark.sql.streaming.util.StreamManualClock
4042
import org.apache.spark.sql.types.StructType
4143
import org.apache.spark.util.ArrayImplicits._
4244

43-
class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
45+
class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually with Matchers {
4446
test("StreamingQueryProgress - prettyJson") {
4547
val json1 = testProgress1.prettyJson
4648
assertJson(
@@ -400,6 +402,40 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
400402
assert(data(0).getAs[Timestamp](0).equals(validValue))
401403
}
402404

405+
test("SPARK-53491: inputRowsPerSecond and processedRowsPerSecond " +
406+
"should never be with scientific notation") {
407+
val progress = testProgress4.jsonValue
408+
409+
// Actual values
410+
val inputRowsPerSecond: Double = 6.923076923076923E8
411+
val processedRowsPerSecond: Double = 2.923076923076923E8
412+
413+
// Get values from progress metrics JSON and cast back to Double
414+
// for numeric comparison
415+
val inputRowsPerSecondJSON = (progress \ "inputRowsPerSecond").values.toString
416+
.toDouble
417+
val processedRowsPerSecondJSON = (progress \ "processedRowsPerSecond").values.toString
418+
.toDouble
419+
420+
// Get expected values after type casting
421+
val inputRowsPerSecondExpected = BigDecimal(inputRowsPerSecond)
422+
.setScale(1, RoundingMode.HALF_UP).toDouble
423+
val processedRowsPerSecondExpected = BigDecimal(processedRowsPerSecond)
424+
.setScale(1, RoundingMode.HALF_UP).toDouble
425+
426+
// This should fail if inputRowsPerSecond contains E notation
427+
(progress \ "inputRowsPerSecond").values.toString should not include "E"
428+
429+
// This should fail if processedRowsPerSecond contains E notation
430+
(progress \ "processedRowsPerSecond").values.toString should not include "E"
431+
432+
// Value in progress metrics should be equal to the Decimal conversion of the same
433+
// Using epsilon to compare floating-point values
434+
val epsilon = 1e-6
435+
inputRowsPerSecondJSON shouldBe inputRowsPerSecondExpected +- epsilon
436+
processedRowsPerSecondJSON shouldBe processedRowsPerSecondExpected +- epsilon
437+
}
438+
403439
def waitUntilBatchProcessed: AssertOnQuery = Execute { q =>
404440
eventually(Timeout(streamingTimeout)) {
405441
if (q.exception.isEmpty) {
@@ -522,6 +558,44 @@ object StreamingQueryStatusAndProgressSuite {
522558
observedMetrics = null
523559
)
524560

561+
val testProgress4 = new StreamingQueryProgress(
562+
id = UUID.randomUUID,
563+
runId = UUID.randomUUID,
564+
name = "myName",
565+
timestamp = "2025-09-05T20:54:20.827Z",
566+
batchId = 2L,
567+
batchDuration = 0L,
568+
durationMs = new java.util.HashMap(Map("total" -> 0L).transform((_, v) => long2Long(v)).asJava),
569+
eventTime = new java.util.HashMap(Map(
570+
"max" -> "2025-09-05T20:54:20.827Z",
571+
"min" -> "2025-09-05T20:54:20.827Z",
572+
"avg" -> "2025-09-05T20:54:20.827Z",
573+
"watermark" -> "2025-09-05T20:54:20.827Z").asJava),
574+
stateOperators = Array(new StateOperatorProgress(operatorName = "op1",
575+
numRowsTotal = 0, numRowsUpdated = 1, allUpdatesTimeMs = 1, numRowsRemoved = 2,
576+
allRemovalsTimeMs = 34, commitTimeMs = 23, memoryUsedBytes = 3, numRowsDroppedByWatermark = 0,
577+
numShufflePartitions = 2, numStateStoreInstances = 2,
578+
customMetrics = new java.util.HashMap(Map("stateOnCurrentVersionSizeBytes" -> 2L,
579+
"loadedMapCacheHitCount" -> 1L, "loadedMapCacheMissCount" -> 0L)
580+
.transform((_, v) => long2Long(v)).asJava)
581+
)),
582+
sources = Array(
583+
new SourceProgress(
584+
description = "source",
585+
startOffset = "123",
586+
endOffset = "456",
587+
latestOffset = "789",
588+
numInputRows = 678,
589+
inputRowsPerSecond = 6.923076923076923E8, // Large double value having exponentials
590+
processedRowsPerSecond = 2.923076923076923E8
591+
)
592+
),
593+
sink = SinkProgress("sink", None),
594+
observedMetrics = new java.util.HashMap(Map(
595+
"event1" -> row(schema1, 1L, 3.0d),
596+
"event2" -> row(schema2, 1L, "hello", "world")).asJava)
597+
)
598+
525599
val testStatus = new StreamingQueryStatus("active", true, false)
526600
}
527601

0 commit comments

Comments
 (0)