From f8da15c585294f94182b4a6f8d100819f5608a52 Mon Sep 17 00:00:00 2001 From: "jayant.sharma" Date: Thu, 4 Sep 2025 21:00:07 +0530 Subject: [PATCH 1/8] [SPARK-53491][SS] Fix exponential formatting of inputRowsPerSecond and processedRowsPerSecond in StreamProgressMetrics json --- .../apache/spark/sql/streaming/progress.scala | 21 ++++++++++---- ...StreamingQueryStatusAndProgressSuite.scala | 29 +++++++++++++++++++ 2 files changed, 45 insertions(+), 5 deletions(-) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala index b7573cb280444..1a58cded86612 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -22,6 +22,7 @@ import java.lang.{Long => JLong} import java.util.UUID import scala.jdk.CollectionConverters._ +import scala.math.BigDecimal.RoundingMode import scala.util.control.NonFatal import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} @@ -35,7 +36,7 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.annotation.Evolving import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema -import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDoubleToJValue, safeMapToJValue} +import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDecimalToJValue, safeMapToJValue} import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS /** @@ -183,8 +184,8 @@ class StreamingQueryProgress private[spark] ( ("batchId" -> JInt(batchId)) ~ ("batchDuration" -> JInt(batchDuration)) ~ ("numInputRows" -> JInt(numInputRows)) ~ - ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ - ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~ + ("inputRowsPerSecond" -> safeDecimalToJValue(inputRowsPerSecond)) ~ + ("processedRowsPerSecond" -> safeDecimalToJValue(processedRowsPerSecond)) ~ ("durationMs" -> safeMapToJValue[JLong](durationMs, v => JInt(v.toLong))) ~ ("eventTime" -> safeMapToJValue[String](eventTime, s => JString(s))) ~ ("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~ @@ -255,8 +256,8 @@ class SourceProgress protected[spark] ( ("endOffset" -> tryParse(endOffset)) ~ ("latestOffset" -> tryParse(latestOffset)) ~ ("numInputRows" -> JInt(numInputRows)) ~ - ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ - ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~ + ("inputRowsPerSecond" -> safeDecimalToJValue(inputRowsPerSecond)) ~ + ("processedRowsPerSecond" -> safeDecimalToJValue(processedRowsPerSecond)) ~ ("metrics" -> safeMapToJValue[String](metrics, s => JString(s))) } @@ -316,6 +317,7 @@ private[sql] object SinkProgress { } private object SafeJsonSerializer { + /** Convert Double to JValue while handling empty or infinite values */ def safeDoubleToJValue(value: Double): JValue = { if (value.isNaN || value.isInfinity) JNothing else JDouble(value) } @@ -326,4 +328,13 @@ private object SafeJsonSerializer { val keys = map.asScala.keySet.toSeq.sorted keys.map { k => k -> valueToJValue(map.get(k)): JObject }.reduce(_ ~ _) } + + /** Convert BigDecimal to JValue while handling empty or infinite values */ + def safeDecimalToJValue(value: Double): JValue = { + if (value.isNaN || value.isInfinity) JNothing + else { + val valueWithScale = BigDecimal(value).setScale(1, RoundingMode.HALF_UP) + JDecimal(valueWithScale) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index e748ae8e7d7df..6d1fa34f815ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -400,6 +400,35 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { assert(data(0).getAs[Timestamp](0).equals(validValue)) } + test("SPARK-53491: `inputRowsPerSecond` and `processedRowsPerSecond` " + + "should never be with scientific notation") { + import testImplicits._ + + val inputData = MemoryStream[Int] + val df = inputData.toDF() + val query = df.writeStream + .format("memory") + .queryName("TestFormatting") + .start() + + try { + val bigBatch = 1 to 900000 + inputData.addData(bigBatch: _*) + + query.processAllAvailable() + + val progress = query.lastProgress.jsonValue + + print(progress) + + assert(!(progress \ "inputRowsPerSecond").values.toString.contains("E")) + assert(!(progress \ "processedRowsPerSecond").values.toString.contains("E")) + } finally { + query.stop() + spark.streams.awaitAnyTermination(1000) // Waiting to allow cleaning all threads + } + } + def waitUntilBatchProcessed: AssertOnQuery = Execute { q => eventually(Timeout(streamingTimeout)) { if (q.exception.isEmpty) { From 383bef4ce035fb9acf57b609421a9beadc577a69 Mon Sep 17 00:00:00 2001 From: "jayant.sharma" Date: Thu, 4 Sep 2025 23:06:56 +0530 Subject: [PATCH 2/8] [SPARK-53491][SS] Removed backquotes from test name and used Matchers instead of assert --- .../StreamingQueryStatusAndProgressSuite.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 6d1fa34f815ba..cff1b909841d2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -27,6 +27,7 @@ import scala.jdk.CollectionConverters._ import org.json4s.jackson.JsonMethods._ import org.scalatest.concurrent.Eventually import org.scalatest.concurrent.PatienceConfiguration.Timeout +import org.scalatest.matchers.should.Matchers import org.scalatest.time.SpanSugar._ import org.apache.spark.sql.Row @@ -40,7 +41,7 @@ import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.types.StructType import org.apache.spark.util.ArrayImplicits._ -class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { +class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually with Matchers { test("StreamingQueryProgress - prettyJson") { val json1 = testProgress1.prettyJson assertJson( @@ -400,7 +401,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { assert(data(0).getAs[Timestamp](0).equals(validValue)) } - test("SPARK-53491: `inputRowsPerSecond` and `processedRowsPerSecond` " + + test("SPARK-53491: inputRowsPerSecond and processedRowsPerSecond " + "should never be with scientific notation") { import testImplicits._ @@ -419,10 +420,8 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { val progress = query.lastProgress.jsonValue - print(progress) - - assert(!(progress \ "inputRowsPerSecond").values.toString.contains("E")) - assert(!(progress \ "processedRowsPerSecond").values.toString.contains("E")) + (progress \ "inputRowsPerSecond").values.toString should not include "E" + (progress \ "processedRowsPerSecond").values.toString should not include "E" } finally { query.stop() spark.streams.awaitAnyTermination(1000) // Waiting to allow cleaning all threads From 2e7bf4c61fbcf1b6466d310e6358f062b3f2f49a Mon Sep 17 00:00:00 2001 From: "jayant.sharma" Date: Fri, 5 Sep 2025 11:28:01 +0530 Subject: [PATCH 3/8] [SPARK-53491][SS] Added comments in the test case, refactored the code. --- .../org/apache/spark/sql/streaming/progress.scala | 4 +++- .../StreamingQueryStatusAndProgressSuite.scala | 12 ++++++++++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 1a58cded86612..4f07fb174cb8f 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -331,7 +331,9 @@ private object SafeJsonSerializer { /** Convert BigDecimal to JValue while handling empty or infinite values */ def safeDecimalToJValue(value: Double): JValue = { - if (value.isNaN || value.isInfinity) JNothing + if (value.isNaN || value.isInfinity) { + JNothing + } else { val valueWithScale = BigDecimal(value).setScale(1, RoundingMode.HALF_UP) JDecimal(valueWithScale) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index cff1b909841d2..b670691c246b7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -409,18 +409,26 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually wi val df = inputData.toDF() val query = df.writeStream .format("memory") - .queryName("TestFormatting") + .queryName("SPARK_53491_test_formatting") .start() try { - val bigBatch = 1 to 900000 + // Submitting a very large batch to inputStream + // Using bigger range may repro the issue easily + // However, that will also cause OOM while running the test case. + // Hence limiting the range. + val bigBatch = 1 to 600000 inputData.addData(bigBatch: _*) query.processAllAvailable() + // JSON representation of the latest query progress val progress = query.lastProgress.jsonValue + // This should fail if inputRowsPerSecond contains E notiation (progress \ "inputRowsPerSecond").values.toString should not include "E" + + // This should fail if processedRowsPerSecond contains E notiation (progress \ "processedRowsPerSecond").values.toString should not include "E" } finally { query.stop() From c1a63ccc509a85a82a50efe9ac371a05525d2f8a Mon Sep 17 00:00:00 2001 From: "jayant.sharma" Date: Fri, 5 Sep 2025 16:03:37 +0530 Subject: [PATCH 4/8] [SPARK-53491][SS] Simplified the test case. --- ...StreamingQueryStatusAndProgressSuite.scala | 72 +++++++++++-------- 1 file changed, 43 insertions(+), 29 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index b670691c246b7..9ab5ecfd65af2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -403,37 +403,13 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually wi test("SPARK-53491: inputRowsPerSecond and processedRowsPerSecond " + "should never be with scientific notation") { - import testImplicits._ - - val inputData = MemoryStream[Int] - val df = inputData.toDF() - val query = df.writeStream - .format("memory") - .queryName("SPARK_53491_test_formatting") - .start() + val progress = testProgress4.jsonValue - try { - // Submitting a very large batch to inputStream - // Using bigger range may repro the issue easily - // However, that will also cause OOM while running the test case. - // Hence limiting the range. - val bigBatch = 1 to 600000 - inputData.addData(bigBatch: _*) + // This should fail if inputRowsPerSecond contains E notation + (progress \ "inputRowsPerSecond").values.toString should not include "E" - query.processAllAvailable() - - // JSON representation of the latest query progress - val progress = query.lastProgress.jsonValue - - // This should fail if inputRowsPerSecond contains E notiation - (progress \ "inputRowsPerSecond").values.toString should not include "E" - - // This should fail if processedRowsPerSecond contains E notiation - (progress \ "processedRowsPerSecond").values.toString should not include "E" - } finally { - query.stop() - spark.streams.awaitAnyTermination(1000) // Waiting to allow cleaning all threads - } + // This should fail if processedRowsPerSecond contains E notation + (progress \ "processedRowsPerSecond").values.toString should not include "E" } def waitUntilBatchProcessed: AssertOnQuery = Execute { q => @@ -558,6 +534,44 @@ object StreamingQueryStatusAndProgressSuite { observedMetrics = null ) + val testProgress4 = new StreamingQueryProgress( + id = UUID.randomUUID, + runId = UUID.randomUUID, + name = "myName", + timestamp = "2025-09-05T20:54:20.827Z", + batchId = 2L, + batchDuration = 0L, + durationMs = new java.util.HashMap(Map("total" -> 0L).transform((_, v) => long2Long(v)).asJava), + eventTime = new java.util.HashMap(Map( + "max" -> "2025-09-05T20:54:20.827Z", + "min" -> "2025-09-05T20:54:20.827Z", + "avg" -> "2025-09-05T20:54:20.827Z", + "watermark" -> "2025-09-05T20:54:20.827Z").asJava), + stateOperators = Array(new StateOperatorProgress(operatorName = "op1", + numRowsTotal = 0, numRowsUpdated = 1, allUpdatesTimeMs = 1, numRowsRemoved = 2, + allRemovalsTimeMs = 34, commitTimeMs = 23, memoryUsedBytes = 3, numRowsDroppedByWatermark = 0, + numShufflePartitions = 2, numStateStoreInstances = 2, + customMetrics = new java.util.HashMap(Map("stateOnCurrentVersionSizeBytes" -> 2L, + "loadedMapCacheHitCount" -> 1L, "loadedMapCacheMissCount" -> 0L) + .transform((_, v) => long2Long(v)).asJava) + )), + sources = Array( + new SourceProgress( + description = "source", + startOffset = "123", + endOffset = "456", + latestOffset = "789", + numInputRows = 678, + inputRowsPerSecond = 6.923076923076923E7, // Large double value having exponentials + processedRowsPerSecond = 2.923076923076923E7 + ) + ), + sink = SinkProgress("sink", None), + observedMetrics = new java.util.HashMap(Map( + "event1" -> row(schema1, 1L, 3.0d), + "event2" -> row(schema2, 1L, "hello", "world")).asJava) + ) + val testStatus = new StreamingQueryStatus("active", true, false) } From d838002d2018a01b3e7f2161354ebe9d0ed77a05 Mon Sep 17 00:00:00 2001 From: "jayant.sharma" Date: Tue, 9 Sep 2025 06:44:40 +0530 Subject: [PATCH 5/8] [SPARK-53491][SS] Fixed brackets formatting as per scalastyle. --- .../scala/org/apache/spark/sql/streaming/progress.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 4f07fb174cb8f..55f7f5c08cb51 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -331,12 +331,8 @@ private object SafeJsonSerializer { /** Convert BigDecimal to JValue while handling empty or infinite values */ def safeDecimalToJValue(value: Double): JValue = { - if (value.isNaN || value.isInfinity) { - JNothing - } - else { - val valueWithScale = BigDecimal(value).setScale(1, RoundingMode.HALF_UP) - JDecimal(valueWithScale) + if (value.isNaN || value.isInfinity) { JNothing } else { + JDecimal(BigDecimal(value).setScale(1, RoundingMode.HALF_UP)) } } } From 09ddad23cde43edef4e7f7f49f35df6de2cdcc00 Mon Sep 17 00:00:00 2001 From: "jayant.sharma" Date: Tue, 9 Sep 2025 07:46:18 +0530 Subject: [PATCH 6/8] [SPARK-53491][SS] Fixed brackets formatting as per scalastyle. --- .../main/scala/org/apache/spark/sql/streaming/progress.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 55f7f5c08cb51..534209eb8d867 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -331,8 +331,7 @@ private object SafeJsonSerializer { /** Convert BigDecimal to JValue while handling empty or infinite values */ def safeDecimalToJValue(value: Double): JValue = { - if (value.isNaN || value.isInfinity) { JNothing } else { - JDecimal(BigDecimal(value).setScale(1, RoundingMode.HALF_UP)) - } + if (value.isNaN || value.isInfinity) JNothing + else JDecimal(BigDecimal(value).setScale(1, RoundingMode.HALF_UP)) } } From aac5caf79f28eda694f44ebdc6151aa8edace8d4 Mon Sep 17 00:00:00 2001 From: "jayant.sharma" Date: Tue, 9 Sep 2025 09:35:51 +0530 Subject: [PATCH 7/8] [SPARK-53491][SS] Reformatted for Run / Linters, licenses, and dependencies build check. --- .../src/main/scala/org/apache/spark/sql/streaming/progress.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 534209eb8d867..1c6be6c2b1f0e 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -317,6 +317,7 @@ private[sql] object SinkProgress { } private object SafeJsonSerializer { + /** Convert Double to JValue while handling empty or infinite values */ def safeDoubleToJValue(value: Double): JValue = { if (value.isNaN || value.isInfinity) JNothing else JDouble(value) From 8ce3bfac7333c72166a2460cf4eab4d1f25edd41 Mon Sep 17 00:00:00 2001 From: "jayant.sharma" Date: Thu, 11 Sep 2025 10:32:00 +0530 Subject: [PATCH 8/8] [SPARK-53491][SS] Added epsilon to compare floating-point values to validate accuracy. --- ...StreamingQueryStatusAndProgressSuite.scala | 28 +++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 9ab5ecfd65af2..3a1dfd3088fa5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -23,6 +23,7 @@ import java.time.temporal.ChronoUnit import java.util.UUID import scala.jdk.CollectionConverters._ +import scala.math.BigDecimal.RoundingMode import org.json4s.jackson.JsonMethods._ import org.scalatest.concurrent.Eventually @@ -405,11 +406,34 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually wi "should never be with scientific notation") { val progress = testProgress4.jsonValue + // Actual values + val inputRowsPerSecond: Double = 6.923076923076923E8 + val processedRowsPerSecond: Double = 2.923076923076923E8 + + // Get values from progress metrics JSON and cast back to Double + // for numeric comparison + val inputRowsPerSecondJSON = (progress \ "inputRowsPerSecond").values.toString + .toDouble + val processedRowsPerSecondJSON = (progress \ "processedRowsPerSecond").values.toString + .toDouble + + // Get expected values after type casting + val inputRowsPerSecondExpected = BigDecimal(inputRowsPerSecond) + .setScale(1, RoundingMode.HALF_UP).toDouble + val processedRowsPerSecondExpected = BigDecimal(processedRowsPerSecond) + .setScale(1, RoundingMode.HALF_UP).toDouble + // This should fail if inputRowsPerSecond contains E notation (progress \ "inputRowsPerSecond").values.toString should not include "E" // This should fail if processedRowsPerSecond contains E notation (progress \ "processedRowsPerSecond").values.toString should not include "E" + + // Value in progress metrics should be equal to the Decimal conversion of the same + // Using epsilon to compare floating-point values + val epsilon = 1e-6 + inputRowsPerSecondJSON shouldBe inputRowsPerSecondExpected +- epsilon + processedRowsPerSecondJSON shouldBe processedRowsPerSecondExpected +- epsilon } def waitUntilBatchProcessed: AssertOnQuery = Execute { q => @@ -562,8 +586,8 @@ object StreamingQueryStatusAndProgressSuite { endOffset = "456", latestOffset = "789", numInputRows = 678, - inputRowsPerSecond = 6.923076923076923E7, // Large double value having exponentials - processedRowsPerSecond = 2.923076923076923E7 + inputRowsPerSecond = 6.923076923076923E8, // Large double value having exponentials + processedRowsPerSecond = 2.923076923076923E8 ) ), sink = SinkProgress("sink", None),